Challenges using Flink REST API

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Challenges using Flink REST API

Wouter Zorgdrager-2
Hey all!

I'm looking for some advice on the following; I'm working on an abstraction on top of Apache Flink to 'pipeline' Flink applications using Kafka. For deployment this means that all these Flink jobs are embedded into one jar and each job is started using an program argument (e.g. "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink jobs onto a cluster I wrote a Python script which basically communicates with the REST client of the JobManager. So you can do things like "pipeline start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink application separately. 

However, this script was written a while ago against Flink version "1.4.2". This week I tried to upgrade it to Flink latest version but I noticed a change in the REST responses. In order to get the "pipeline start" command working,we need to know all the Flink jobs that are in the jar (we call these Flink jobs 'stages') because we need to know the stage names as argument for the jar. For the 1.4.2 version we used a dirty trick; we ran the jar with '--list --asException' as program arguments which basically runs the jar file and immediately throws an exception with the stage names. These are then parsed and used to start every stage separately. The error message that Flink threw looked something like this:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: org.codefeedr.pipeline.PipelineListException: ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at nl.wouterr.Main$.main(Main.scala:23)
at nl.wouterr.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

However, for 1.7.0 this trick doesn't work anymore because instead of returning the full stack trace, it only returns the following:
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 

In the console of the JobManager it does give the full stack trace though. So first of all I'm wondering if there might be a way to enable more detailed stacktraces for Flink 1.7 in the REST responses. If not, do you have any suggestions on how to tackle this problem. I know, in the end this isn't really a Flink problem however you might know a workaround in the Flink REST client to achieve the same. 

Some solutions I already considered:
- Running the jar with the "--list --asException" locally through the Python script; however Flink and Scala are not provided in the jar. Technically I could add them both to the classpath, but this would require users to have the Flink jar locally (and also Scala somewhere, but I assume most have).  
- Let users provide a list of stage names for all their (interconnected) Flink jobs. This is not really an option, because the (main) idea behind this framework is to reduce the boilerplate and cumbersome of setting up complex stream processing architectures. 

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager

Reply | Threaded
Open this post in threaded view
|

Re: Challenges using Flink REST API

Chesnay Schepler
You should get the full stacktrace if you upgrade to 1.7.2 .

On 13.03.2019 09:55, Wouter Zorgdrager wrote:
Hey all!

I'm looking for some advice on the following; I'm working on an abstraction on top of Apache Flink to 'pipeline' Flink applications using Kafka. For deployment this means that all these Flink jobs are embedded into one jar and each job is started using an program argument (e.g. "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink jobs onto a cluster I wrote a Python script which basically communicates with the REST client of the JobManager. So you can do things like "pipeline start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink application separately. 

However, this script was written a while ago against Flink version "1.4.2". This week I tried to upgrade it to Flink latest version but I noticed a change in the REST responses. In order to get the "pipeline start" command working,we need to know all the Flink jobs that are in the jar (we call these Flink jobs 'stages') because we need to know the stage names as argument for the jar. For the 1.4.2 version we used a dirty trick; we ran the jar with '--list --asException' as program arguments which basically runs the jar file and immediately throws an exception with the stage names. These are then parsed and used to start every stage separately. The error message that Flink threw looked something like this:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: org.codefeedr.pipeline.PipelineListException: ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at nl.wouterr.Main$.main(Main.scala:23)
at nl.wouterr.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

However, for 1.7.0 this trick doesn't work anymore because instead of returning the full stack trace, it only returns the following:
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 

In the console of the JobManager it does give the full stack trace though. So first of all I'm wondering if there might be a way to enable more detailed stacktraces for Flink 1.7 in the REST responses. If not, do you have any suggestions on how to tackle this problem. I know, in the end this isn't really a Flink problem however you might know a workaround in the Flink REST client to achieve the same. 

Some solutions I already considered:
- Running the jar with the "--list --asException" locally through the Python script; however Flink and Scala are not provided in the jar. Technically I could add them both to the classpath, but this would require users to have the Flink jar locally (and also Scala somewhere, but I assume most have).  
- Let users provide a list of stage names for all their (interconnected) Flink jobs. This is not really an option, because the (main) idea behind this framework is to reduce the boilerplate and cumbersome of setting up complex stream processing architectures. 

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager


Reply | Threaded
Open this post in threaded view
|

Re: Challenges using Flink REST API

Wouter Zorgdrager-2
Hi Chesnay,

Unfortunately this is not true when I run the Flink 1.7.2 docker images. The response is still:
{
    "errors": [
        "org.apache.flink.client.program.ProgramInvocationException: The main method caused an error."
    ]
}

Regards,
Wouter Zorgdrager

Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler <[hidden email]>:
You should get the full stacktrace if you upgrade to 1.7.2 .


On 13.03.2019 09:55, Wouter Zorgdrager wrote:
Hey all!

I'm looking for some advice on the following; I'm working on an abstraction on top of Apache Flink to 'pipeline' Flink applications using Kafka. For deployment this means that all these Flink jobs are embedded into one jar and each job is started using an program argument (e.g. "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink jobs onto a cluster I wrote a Python script which basically communicates with the REST client of the JobManager. So you can do things like "pipeline start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink application separately. 

However, this script was written a while ago against Flink version "1.4.2". This week I tried to upgrade it to Flink latest version but I noticed a change in the REST responses. In order to get the "pipeline start" command working,we need to know all the Flink jobs that are in the jar (we call these Flink jobs 'stages') because we need to know the stage names as argument for the jar. For the 1.4.2 version we used a dirty trick; we ran the jar with '--list --asException' as program arguments which basically runs the jar file and immediately throws an exception with the stage names. These are then parsed and used to start every stage separately. The error message that Flink threw looked something like this:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: org.codefeedr.pipeline.PipelineListException: ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at nl.wouterr.Main$.main(Main.scala:23)
at nl.wouterr.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

However, for 1.7.0 this trick doesn't work anymore because instead of returning the full stack trace, it only returns the following:
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 

In the console of the JobManager it does give the full stack trace though. So first of all I'm wondering if there might be a way to enable more detailed stacktraces for Flink 1.7 in the REST responses. If not, do you have any suggestions on how to tackle this problem. I know, in the end this isn't really a Flink problem however you might know a workaround in the Flink REST client to achieve the same. 

Some solutions I already considered:
- Running the jar with the "--list --asException" locally through the Python script; however Flink and Scala are not provided in the jar. Technically I could add them both to the classpath, but this would require users to have the Flink jar locally (and also Scala somewhere, but I assume most have).  
- Let users provide a list of stage names for all their (interconnected) Flink jobs. This is not really an option, because the (main) idea behind this framework is to reduce the boilerplate and cumbersome of setting up complex stream processing architectures. 

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager


Reply | Threaded
Open this post in threaded view
|

Re: Challenges using Flink REST API

Chesnay Schepler
Can you give me the stacktrace that is logged in the JobManager logs?

On 13.03.2019 10:57, Wouter Zorgdrager wrote:
Hi Chesnay,

Unfortunately this is not true when I run the Flink 1.7.2 docker images. The response is still:
{
    "errors": [
        "org.apache.flink.client.program.ProgramInvocationException: The main method caused an error."
    ]
}

Regards,
Wouter Zorgdrager

Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler <[hidden email]>:
You should get the full stacktrace if you upgrade to 1.7.2 .


On 13.03.2019 09:55, Wouter Zorgdrager wrote:
Hey all!

I'm looking for some advice on the following; I'm working on an abstraction on top of Apache Flink to 'pipeline' Flink applications using Kafka. For deployment this means that all these Flink jobs are embedded into one jar and each job is started using an program argument (e.g. "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink jobs onto a cluster I wrote a Python script which basically communicates with the REST client of the JobManager. So you can do things like "pipeline start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink application separately. 

However, this script was written a while ago against Flink version "1.4.2". This week I tried to upgrade it to Flink latest version but I noticed a change in the REST responses. In order to get the "pipeline start" command working,we need to know all the Flink jobs that are in the jar (we call these Flink jobs 'stages') because we need to know the stage names as argument for the jar. For the 1.4.2 version we used a dirty trick; we ran the jar with '--list --asException' as program arguments which basically runs the jar file and immediately throws an exception with the stage names. These are then parsed and used to start every stage separately. The error message that Flink threw looked something like this:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: org.codefeedr.pipeline.PipelineListException: ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at nl.wouterr.Main$.main(Main.scala:23)
at nl.wouterr.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

However, for 1.7.0 this trick doesn't work anymore because instead of returning the full stack trace, it only returns the following:
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 

In the console of the JobManager it does give the full stack trace though. So first of all I'm wondering if there might be a way to enable more detailed stacktraces for Flink 1.7 in the REST responses. If not, do you have any suggestions on how to tackle this problem. I know, in the end this isn't really a Flink problem however you might know a workaround in the Flink REST client to achieve the same. 

Some solutions I already considered:
- Running the jar with the "--list --asException" locally through the Python script; however Flink and Scala are not provided in the jar. Technically I could add them both to the classpath, but this would require users to have the Flink jar locally (and also Scala somewhere, but I assume most have).  
- Let users provide a list of stage names for all their (interconnected) Flink jobs. This is not really an option, because the (main) idea behind this framework is to reduce the boilerplate and cumbersome of setting up complex stream processing architectures. 

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager



Reply | Threaded
Open this post in threaded view
|

Re: Challenges using Flink REST API

Wouter Zorgdrager-2
Hey Chesnay,

Actually I was mistaken by stating that in the JobManager logs I got the full stacktrace because I actually got the following there:
2019-03-13 11:55:13,906 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

By some googling I came across this Jira issue [1], which seems to fix my issue in 1.8.0. However, I was still confused why this ever worked for me in 1.4.2 and by checking some binaries I found out that the REST API was reworked for 1.5.0 [2] which removed the full stack trace.

Is there any (official) Docker image to already run Flink 1.8?

Thanks,
Wouter



Op wo 13 mrt. 2019 om 12:18 schreef Chesnay Schepler <[hidden email]>:
Can you give me the stacktrace that is logged in the JobManager logs?


On 13.03.2019 10:57, Wouter Zorgdrager wrote:
Hi Chesnay,

Unfortunately this is not true when I run the Flink 1.7.2 docker images. The response is still:
{
    "errors": [
        "org.apache.flink.client.program.ProgramInvocationException: The main method caused an error."
    ]
}

Regards,
Wouter Zorgdrager

Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler <[hidden email]>:
You should get the full stacktrace if you upgrade to 1.7.2 .


On 13.03.2019 09:55, Wouter Zorgdrager wrote:
Hey all!

I'm looking for some advice on the following; I'm working on an abstraction on top of Apache Flink to 'pipeline' Flink applications using Kafka. For deployment this means that all these Flink jobs are embedded into one jar and each job is started using an program argument (e.g. "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink jobs onto a cluster I wrote a Python script which basically communicates with the REST client of the JobManager. So you can do things like "pipeline start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink application separately. 

However, this script was written a while ago against Flink version "1.4.2". This week I tried to upgrade it to Flink latest version but I noticed a change in the REST responses. In order to get the "pipeline start" command working,we need to know all the Flink jobs that are in the jar (we call these Flink jobs 'stages') because we need to know the stage names as argument for the jar. For the 1.4.2 version we used a dirty trick; we ran the jar with '--list --asException' as program arguments which basically runs the jar file and immediately throws an exception with the stage names. These are then parsed and used to start every stage separately. The error message that Flink threw looked something like this:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: org.codefeedr.pipeline.PipelineListException: ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at nl.wouterr.Main$.main(Main.scala:23)
at nl.wouterr.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

However, for 1.7.0 this trick doesn't work anymore because instead of returning the full stack trace, it only returns the following:
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 

In the console of the JobManager it does give the full stack trace though. So first of all I'm wondering if there might be a way to enable more detailed stacktraces for Flink 1.7 in the REST responses. If not, do you have any suggestions on how to tackle this problem. I know, in the end this isn't really a Flink problem however you might know a workaround in the Flink REST client to achieve the same. 

Some solutions I already considered:
- Running the jar with the "--list --asException" locally through the Python script; however Flink and Scala are not provided in the jar. Technically I could add them both to the classpath, but this would require users to have the Flink jar locally (and also Scala somewhere, but I assume most have).  
- Let users provide a list of stage names for all their (interconnected) Flink jobs. This is not really an option, because the (main) idea behind this framework is to reduce the boilerplate and cumbersome of setting up complex stream processing architectures. 

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager



Reply | Threaded
Open this post in threaded view
|

Re: Challenges using Flink REST API

Chesnay Schepler
My bad, I was looking at the wrong code path. The linked issue isn't helpful, as it only slightly extends the exception message.

You cannot get the stacktrace in 1.7.X nor in the current RC for 1.8.0 . I've filed https://issues.apache.org/jira/browse/FLINK-11902 to change this.

The 1.8.0 RC just got cancelled, so I may be able to get this in, no promises though.

On 13.03.2019 13:18, Wouter Zorgdrager wrote:
Hey Chesnay,

Actually I was mistaken by stating that in the JobManager logs I got the full stacktrace because I actually got the following there:
2019-03-13 11:55:13,906 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

By some googling I came across this Jira issue [1], which seems to fix my issue in 1.8.0. However, I was still confused why this ever worked for me in 1.4.2 and by checking some binaries I found out that the REST API was reworked for 1.5.0 [2] which removed the full stack trace.

Is there any (official) Docker image to already run Flink 1.8?

Thanks,
Wouter



Op wo 13 mrt. 2019 om 12:18 schreef Chesnay Schepler <[hidden email]>:
Can you give me the stacktrace that is logged in the JobManager logs?


On 13.03.2019 10:57, Wouter Zorgdrager wrote:
Hi Chesnay,

Unfortunately this is not true when I run the Flink 1.7.2 docker images. The response is still:
{
    "errors": [
        "org.apache.flink.client.program.ProgramInvocationException: The main method caused an error."
    ]
}

Regards,
Wouter Zorgdrager

Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler <[hidden email]>:
You should get the full stacktrace if you upgrade to 1.7.2 .


On 13.03.2019 09:55, Wouter Zorgdrager wrote:
Hey all!

I'm looking for some advice on the following; I'm working on an abstraction on top of Apache Flink to 'pipeline' Flink applications using Kafka. For deployment this means that all these Flink jobs are embedded into one jar and each job is started using an program argument (e.g. "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink jobs onto a cluster I wrote a Python script which basically communicates with the REST client of the JobManager. So you can do things like "pipeline start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink application separately. 

However, this script was written a while ago against Flink version "1.4.2". This week I tried to upgrade it to Flink latest version but I noticed a change in the REST responses. In order to get the "pipeline start" command working,we need to know all the Flink jobs that are in the jar (we call these Flink jobs 'stages') because we need to know the stage names as argument for the jar. For the 1.4.2 version we used a dirty trick; we ran the jar with '--list --asException' as program arguments which basically runs the jar file and immediately throws an exception with the stage names. These are then parsed and used to start every stage separately. The error message that Flink threw looked something like this:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: org.codefeedr.pipeline.PipelineListException: ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at nl.wouterr.Main$.main(Main.scala:23)
at nl.wouterr.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

However, for 1.7.0 this trick doesn't work anymore because instead of returning the full stack trace, it only returns the following:
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 

In the console of the JobManager it does give the full stack trace though. So first of all I'm wondering if there might be a way to enable more detailed stacktraces for Flink 1.7 in the REST responses. If not, do you have any suggestions on how to tackle this problem. I know, in the end this isn't really a Flink problem however you might know a workaround in the Flink REST client to achieve the same. 

Some solutions I already considered:
- Running the jar with the "--list --asException" locally through the Python script; however Flink and Scala are not provided in the jar. Technically I could add them both to the classpath, but this would require users to have the Flink jar locally (and also Scala somewhere, but I assume most have).  
- Let users provide a list of stage names for all their (interconnected) Flink jobs. This is not really an option, because the (main) idea behind this framework is to reduce the boilerplate and cumbersome of setting up complex stream processing architectures. 

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager