Cannot modify parallelism (rescale job) more than once

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot modify parallelism (rescale job) more than once

Pankaj Chand
Hello,

I am trying to modify the parallelism of a streaming Flink job (wiki-edits example) multiple times on a standalone cluster (one local machine) having two TaskManagers with 3 slots each (i.e. 6 slots total). However, the "modify" command is only working once (e.g. when I change the parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or even back to 2), it is giving an error.

I am using Flink 1.8.1 (since I found that the modify parallelism command has been removed from v1.9 documentation) and have configured savepoints to be written to file:///home/pankaj/flink-checkpoints. The output of the first "modify <jobid> -p 4" command and second "modify <jobid> -p 6" command is copied below.

Please tell me how to modify parallelism multiple times at runtime.

Thanks,

Pankaj


$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
Modify job 94831ca34951975dbee3335a384ee935.
Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.

$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
Modify job 94831ca34951975dbee3335a384ee935.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not rescale job 94831ca34951975dbee3335a384ee935.
at org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 20 more
Reply | Threaded
Open this post in threaded view
|

Re: Cannot modify parallelism (rescale job) more than once

vino yang
Hi Pankaj,

It seems it is a bug. You can report it by opening a Jira issue.

Best,
Vino

Pankaj Chand <[hidden email]> 于2019年10月28日周一 上午10:51写道:
Hello,

I am trying to modify the parallelism of a streaming Flink job (wiki-edits example) multiple times on a standalone cluster (one local machine) having two TaskManagers with 3 slots each (i.e. 6 slots total). However, the "modify" command is only working once (e.g. when I change the parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or even back to 2), it is giving an error.

I am using Flink 1.8.1 (since I found that the modify parallelism command has been removed from v1.9 documentation) and have configured savepoints to be written to file:///home/pankaj/flink-checkpoints. The output of the first "modify <jobid> -p 4" command and second "modify <jobid> -p 6" command is copied below.

Please tell me how to modify parallelism multiple times at runtime.

Thanks,

Pankaj


$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
Modify job 94831ca34951975dbee3335a384ee935.
Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.

$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
Modify job 94831ca34951975dbee3335a384ee935.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not rescale job 94831ca34951975dbee3335a384ee935.
at org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 20 more
Reply | Threaded
Open this post in threaded view
|

Re: Cannot modify parallelism (rescale job) more than once

Pankaj Chand
Thank you!

On Mon, Oct 28, 2019 at 3:53 AM vino yang <[hidden email]> wrote:
Hi Pankaj,

It seems it is a bug. You can report it by opening a Jira issue.

Best,
Vino

Pankaj Chand <[hidden email]> 于2019年10月28日周一 上午10:51写道:
Hello,

I am trying to modify the parallelism of a streaming Flink job (wiki-edits example) multiple times on a standalone cluster (one local machine) having two TaskManagers with 3 slots each (i.e. 6 slots total). However, the "modify" command is only working once (e.g. when I change the parallelism from 2 to 4). The second time (e.g. change parallelism to 6 or even back to 2), it is giving an error.

I am using Flink 1.8.1 (since I found that the modify parallelism command has been removed from v1.9 documentation) and have configured savepoints to be written to file:///home/pankaj/flink-checkpoints. The output of the first "modify <jobid> -p 4" command and second "modify <jobid> -p 6" command is copied below.

Please tell me how to modify parallelism multiple times at runtime.

Thanks,

Pankaj


$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
Modify job 94831ca34951975dbee3335a384ee935.
Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.

$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
Modify job 94831ca34951975dbee3335a384ee935.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not rescale job 94831ca34951975dbee3335a384ee935.
at org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 20 more