So I am looking at the Flink Management REST API... and, as I see it, there are two paths to rescale a running topology: 1. Stop the topology with a savepoint and then start it up with the new savepoint; or 2. Use the /jobs/:jobid/rescaling endpoint The first one seems to work just fine. The second one seems to just blow up every time I try to use it... I'll get things like: The above was for the topology https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java running with options: --source parallel Things are even worse with --source iterator as that has no checkpoint state to recover from Right now I am trying to discover what preconditions are required to be met in order to be able to safely call the Rescaling endpoint and actually have it work... I should note that I currently have not managed to get it to work at all!!! One of the things we are trying to do is add some automation to enable scale-up / down as we see surges in processing load. We want to have an automated system that can respond to those situations automatically for low deltas and trigger an on-call engineer for persistent excess load. In that regard I'd like to know what the automation should check to know whether it can do rescaling via the dedicated end-point or if it should use the reliable (but presumably slower) path of stop with savepoint & start from savepoint. The https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java job I have been using is just a quick job to let me test the automation on a local cluster. It is designed to output a strictly increasing sequence of numbers without missing any... optionally double them and then print them out. The different sources are me experimenting with different types of operator to see what kinds of topology can work with the rescaling end-point Thanks in advance |
The plot thickens... I was able to rescale down... just not back up again!!! root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m localhost:8081 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology (RUNNING) -------------------------------------------------------------- No scheduled jobs. root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1 Modify job ebc20a700c334f61ea03ecdf3d8939ca. Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1. root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2 Modify job ebc20a700c334f61ea03ecdf3d8939ca. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Could not rescale job ebc20a700c334f61ea03ecdf3d8939ca. at org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor.aroundReceive(Actor.scala:502) at akka.actor.Actor.aroundReceive$(Actor.scala:500) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) ... 20 more On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <[hidden email]> wrote:
|
And now the job is stuck in a suspended state and I seem to have no way to get it out of that state again! On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <[hidden email]> wrote:
|
Ooooh more fun... If I rescale down a job, the job's config at jobs/{jobid}/config does not reflect the new parallelism (there may not even be any way to detect such a parallelism change)... but more critically the job is now unstoppable and seems to end up stuck in the CANCELLING state for some time (I gave up waiting) On Fri, 7 Feb 2020 at 11:54, Stephen Connolly <[hidden email]> wrote:
|
Hi Stephen, I am sorry that you had this experience with the rescale API. Unfortunately, the rescale API was always experimental and had some flaws. Recently, Flink community decided to disable it temporarily with the 1.9 release, see more explanation here [1]. I would advise the manual rescaling (path 1 in your original message). Technically, the rescale operation skips some steps like job master startup and uploading job artefacts but then it still does a similar thing as the manual workflow: 1. take savepoint 2. redeploy tasks with the new parallelism from that savepoint So practically, there should not be a big difference but it depends on the job, of course, whether the rescale operation is faster or not. Thanks, Andrey On Fri, Feb 7, 2020 at 1:31 PM Stephen Connolly <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |