Job submission: Fail using command line. Success using web (flink1.2.0)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Job submission: Fail using command line. Success using web (flink1.2.0)

Rami Al-Isawi
Hi,

The same exact jar on the same machine is being deployed just fine in couple of seconds using the web interface. On the other hand, if I used the command line, I get:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I did increase the timeout, but it fails the same way. 

I assume that submission method should not be relevant, so what is the difference between command-line and web submission? 

I tested with taking out some subtasks and that made the command-line successfully submit the job, but then how come it worked fine using the web interface with all the subtasks included?

Regards,
-Rami
Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: Job submission: Fail using command line. Success using web (flink1.2.0)

Rami Al-Isawi
Hi Robert,

Yes there is an OversizedPayloadException in the job manager log:
---------------
2017-05-22 15:39:18,942 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Upload jar files to job manager <a href="akka.tcp://flink@localhost:6123/user/jobmanager" class="">akka.tcp://flink@localhost:6123/user/jobmanager.
2017-05-22 15:39:18,957 INFO  org.apache.flink.runtime.blob.BlobClient                      - Blob client connecting to <a href="akka.tcp://flink@localhost:6123/user/jobmanager" class="">akka.tcp://flink@localhost:6123/user/jobmanager
2017-05-22 15:39:19,451 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Submit job to the job manager <a href="akka.tcp://flink@localhost:6123/user/jobmanager" class="">akka.tcp://flink@localhost:6123/user/jobmanager.
2017-05-22 15:39:19,632 ERROR akka.remote.EndpointWriter                                    - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[<a href="akka.tcp://flink@localhost:6123/user/jobmanager#1736150911]:" class="">akka.tcp://flink@localhost:6123/user/jobmanager#1736150911]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 24555177 bytes.
2017-05-22 15:41:19,472 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at fwdnxt.Sonar.main(Sonar.java:162)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:294)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:382)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
... 22 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
-------------
Does this help in explaining why it fails only in the command line client? How can I fix it, what is "JobManagerMessages$LeaderSessionMessage was 24555177 bytes”?


Regards,
-Rami
On 12 May 2017, at 19:21, Robert Metzger <[hidden email]> wrote:

Hi,
did you check the jobmanager log for any incoming messages? I would be interesting to see if the JM failed after the initial akka message, or if there's any kind of hiccup ?

On Thu, May 11, 2017 at 5:07 PM, Rami Al-Isawi <[hidden email]> wrote:
Hi,

The same exact jar on the same machine is being deployed just fine in couple of seconds using the web interface. On the other hand, if I used the command line, I get:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I did increase the timeout, but it fails the same way. 

I assume that submission method should not be relevant, so what is the difference between command-line and web submission? 

I tested with taking out some subtasks and that made the command-line successfully submit the job, but then how come it worked fine using the web interface with all the subtasks included?

Regards,
-Rami
Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.


Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: Job submission: Fail using command line. Success using web (flink1.2.0)

rmetzger0
Hi Rami,

I think the problem is that when submitting your job through the web interface, Akka will not use remoting (= it will not send your message to a remote machine). When you submit your message from the client, it'll go through akka's network stack (=remoting).
Akka rejects messages above a certain size. It looks like your job exceeds that size.


On Mon, May 22, 2017 at 2:56 PM, Rami Al-Isawi <[hidden email]> wrote:
Hi Robert,

Yes there is an OversizedPayloadException in the job manager log:
---------------
2017-05-22 15:39:18,942 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Upload jar files to job manager akka.tcp://flink@localhost:6123/user/jobmanager.
2017-05-22 15:39:18,957 INFO  org.apache.flink.runtime.blob.BlobClient                      - Blob client connecting to akka.tcp://flink@localhost:6123/user/jobmanager
2017-05-22 15:39:19,451 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Submit job to the job manager akka.tcp://flink@localhost:6123/user/jobmanager.
2017-05-22 15:39:19,632 ERROR akka.remote.EndpointWriter                                    - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1736150911]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 24555177 bytes.
2017-05-22 15:41:19,472 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at fwdnxt.Sonar.main(Sonar.java:162)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:294)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:382)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
... 22 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
-------------
Does this help in explaining why it fails only in the command line client? How can I fix it, what is "JobManagerMessages$LeaderSessionMessage was 24555177 bytes”?


Regards,
-Rami
On 12 May 2017, at 19:21, Robert Metzger <[hidden email]> wrote:

Hi,
did you check the jobmanager log for any incoming messages? I would be interesting to see if the JM failed after the initial akka message, or if there's any kind of hiccup ?

On Thu, May 11, 2017 at 5:07 PM, Rami Al-Isawi <[hidden email]> wrote:
Hi,

The same exact jar on the same machine is being deployed just fine in couple of seconds using the web interface. On the other hand, if I used the command line, I get:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I did increase the timeout, but it fails the same way. 

I assume that submission method should not be relevant, so what is the difference between command-line and web submission? 

I tested with taking out some subtasks and that made the command-line successfully submit the job, but then how come it worked fine using the web interface with all the subtasks included?

Regards,
-Rami
Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.


Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.