I'm looking for some advice on the following; I'm working
on an abstraction on top of Apache Flink to 'pipeline' Flink
applications using Kafka. For deployment this means that all
these Flink jobs are embedded into one jar and each job is
started using an program argument (e.g. "--stage
'FirstFlinkJob'". To ease deploying a set of interconnected
Flink jobs onto a cluster I wrote a Python script which
basically communicates with the REST client of the JobManager.
So you can do things like "pipeline start --jar
'JarWithThePipeline.jar'" and this would deploy every Flink
application separately.
However, this script was written a while ago against Flink
version "1.4.2". This week I tried to upgrade it to Flink
latest version but I noticed a change in the REST responses.
In order to get the "pipeline start" command working,we need
to know all the Flink jobs that are in the jar (we call these
Flink jobs 'stages') because we need to know the stage names
as argument for the jar. For the 1.4.2 version we used a dirty
trick; we ran the jar with '--list --asException' as program
arguments which basically runs the jar file and immediately
throws an exception with the stage names. These are then
parsed and used to start every stage separately. The error
message that Flink threw looked something like this:
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: Could not run the
jar.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.util.FlinkException: Could not run the
jar.
...
9 more
Caused by:
org.apache.flink.client.program.ProgramInvocationException:
The main method caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
...
8 more
Caused by:
org.codefeedr.pipeline.PipelineListException:
["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at
org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at
org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at
nl.wouterr.Main$.main(Main.scala:23)
at
nl.wouterr.Main.main(Main.scala)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
However, for 1.7.0 this trick doesn't work anymore because
instead of returning the full stack trace, it only returns the
following:
In the console of the JobManager it does give the full
stack trace though. So first of all I'm wondering if there
might be a way to enable more detailed stacktraces for Flink
1.7 in the REST responses. If not, do you have any suggestions
on how to tackle this problem. I know, in the end this isn't
really a Flink problem however you might know a workaround in
the Flink REST client to achieve the same.
- Running the jar with the "--list --asException" locally
through the Python script; however Flink and Scala are not
provided in the jar. Technically I could add them both to the
classpath, but this would require users to have the Flink jar
locally (and also Scala somewhere, but I assume most have).
- Let users provide a list of stage names for all their
(interconnected) Flink jobs. This is not really an option,
because the (main) idea behind this framework is to reduce the
boilerplate and cumbersome of setting up complex stream
processing architectures.
Any help is appreciated. Thanks in advance!