Complex batch workflow needs (too) much time to create executionPlan

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Complex batch workflow needs (too) much time to create executionPlan

Markus Nentwig
Hello Flink community,

I created a slightly long batch workflow for my use case of clustering vertices using
Flink and Gelly. Executing each of the workflow parts individually (and write
intermediate results to disk) works as suspected.

When combining workflow parts to longer jobs, I noticed that the time between
'Job Name' time and the actual 'Start Time' in the Flink Dashboard differ. With
longer workflow chains the time difference gets bigger and bigger.  At this
point, I think that Flink is creating the execution plan which is executed
directly afterwards. As an example (90% of the workflow combined), I 'wait' for
the execution plan for 77-78 seconds, then the job is accepted for execution and
needs another 7-9 seconds to process a small test dataset (<8k vertices with
property values and edges) - each run repeated 3 times. If running only
env.getExecutionPlan() I will wait similar time for the execution plan. I added the
JSON execution plan to this post. For bigger datasets the execution plan
creation time and the job execution time grows as well in my scenario.

When I now add a vertex centric iteration to my workflow and start the Flink
job, I don't get a result at all: I stopped the job
(print execution plan to log) at the following point:

- waited > 20 hours after 'flink run ...'
- two cores on my machine are at 100% all the time working on the flink job
- no entry in Flink dashboard at all
- no entry in log file after these lines:

###
[...]
org.apache.flink.client.CliFrontend            - Starting execution of program
org.apache.flink.client.program.Client         - Starting program in interactive mode
org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
org.apache.flink.optimizer.Optimizer           - The parallelism of nested dataflows (such as step functions in iterations) is currently fixed to the parallelism of the surrounding operator (the iteration).
###

Most likely the workflow could be optimized in many ways to need less time at
certain points (yes, I am not a Flink expert in many places), but I think that
long/complex workflows would still suffer of problems like this.
Due to the fact that every single step is producing output (and some combined
parts of the workflow do so, too), I currently suspect the Flink optimizer /
execution plan creation to be the problem and therefore ask anyone here if you
have experience with similar behavior. Any suggestions how I could successfully
run long/complex workflows not running in such problems? ;)

If there is not (an instant) 'solution' to the problem I would be still
interested in opinions and ideas, thanks in advance!                                                                                                                                                              

Best,
Markus

complexExecPlan.json
Reply | Threaded
Open this post in threaded view
|

Re: Complex batch workflow needs (too) much time to create executionPlan

Fabian Hueske-2
Hi Markus,

you might be right, that a lot of time is spend in optimization.
The optimizer enumerates all alternatives and chooses the plan with the least estimated cost. The degrees of freedom of the optimizer are rather restricted (execution strategies and the used partitioning & sorting keys. The order of operators is not optimized). However, your plan contains of more than 250 operators which is pretty large (in fact, I haven't seen a Flink plan of this size yet).
I assume that this is only one part of the program that exceeded the 20 minutes of optimization, correct?

In order to verify that Flink is stuck in the optimization phase, you could take a stacktrace to see which methods the Java process currently executes.

One way to improve the optimization time is to set a few JoinHints to reduce the degrees of freedom and number of enumerated plans.

Hope this helps,
Fabian

2016-08-22 13:50 GMT+02:00 Markus Nentwig <[hidden email]>:
Hello Flink community,

I created a slightly long batch workflow for my use case of clustering
vertices using
Flink and Gelly. Executing each of the workflow parts individually (and
write
intermediate results to disk) works as suspected.

When combining workflow parts to longer jobs, I noticed that the time
between
'Job Name' time and the actual 'Start Time' in the Flink Dashboard differ.
With
longer workflow chains the time difference gets bigger and bigger.  At this
point, I think that Flink is creating the execution plan which is executed
directly afterwards. As an example (90% of the workflow combined), I 'wait'
for
the execution plan for 77-78 seconds, then the job is accepted for execution
and
needs another 7-9 seconds to process a small test dataset (<8k vertices with
property values and edges) - each run repeated 3 times. If running only
env.getExecutionPlan() I will wait similar time for the execution plan. I
added the
JSON execution plan to this post. For bigger datasets the execution plan
creation time and the job execution time grows as well in my scenario.

When I now add a vertex centric iteration to my workflow and start the Flink
job, I don't get a result at all: I stopped the job
(print execution plan to log) at the following point:

- waited > 20 hours after 'flink run ...'
- two cores on my machine are at 100% all the time working on the flink job
- no entry in Flink dashboard at all
- no entry in log file after these lines:

###
[...]
org.apache.flink.client.CliFrontend            - Starting execution of
program
org.apache.flink.client.program.Client         - Starting program in
interactive mode
org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered
types and 0 default Kryo serializers
org.apache.flink.optimizer.Optimizer           - The parallelism of nested
dataflows (such as step functions in iterations) is currently fixed to the
parallelism of the surrounding operator (the iteration).
###

Most likely the workflow could be optimized in many ways to need less time
at
certain points (yes, I am not a Flink expert in many places), but I think
that
long/complex workflows would still suffer of problems like this.
Due to the fact that every single step is producing output (and some
combined
parts of the workflow do so, too), I currently suspect the Flink optimizer /
execution plan creation to be the problem and therefore ask anyone here if
you
have experience with similar behavior. Any suggestions how I could
successfully
run long/complex workflows not running in such problems? ;)

If there is not (an instant) 'solution' to the problem I would be still
interested in opinions and ideas, thanks in advance!

Best,
Markus

complexExecPlan.json
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n8596/complexExecPlan.json>



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Complex batch workflow needs (too) much time to create executionPlan

Markus Nentwig
Hi Fabian,

at first, sorry for the late answer. The given execution plan was created after 20 minutes, only one vertex centric iteration is missing. I can optimize the program because some operators are only needed to create intermediate debug results, still, it's not enough to run as one Flink job. My "solution" is currently that I split the program in several parts and execute them on their own writing intermediate results to disk, which is working.

As for the stacktrace, I created some of them for the "big" workflow which does not finish the optimization phase at all, here are three of them:
shortly after start: http://pastebin.com/i7SMGbxa
after ~32 minutes: http://pastebin.com/yJrYETxi
after ~76minutes: http://pastebin.com/fCsv8bie

I am no expert in analyzing these stacktraces, but perhaps it's helping in some way for you!? ;)
Reply | Threaded
Open this post in threaded view
|

Re: Complex batch workflow needs (too) much time to create executionPlan

Fabian Hueske-2
Hi Markus,

thanks for the stacktraces!
The client is indeed stuck in the optimizer. I have to look a bit more into this.
Did you try to set JoinHints in your plan? That should reduce the plan space that is enumerated and therefore reduce the optimization time (maybe enough to run your application as a single program).

Nonetheless, I think we should look into the optimizer and check how we can improve the plan enumeration.
I create the JIRA issue FLINK-4688 [1] to track this issue.

Thanks for reporting this,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-4688



2016-09-26 17:58 GMT+02:00 Markus Nentwig <[hidden email]>:
Hi Fabian,

at first, sorry for the late answer. The given execution plan was created
after 20 minutes, only one vertex centric iteration is missing. I can
optimize the program because some operators are only needed to create
intermediate debug results, still, it's not enough to run as one Flink job.
My "solution" is currently that I split the program in several parts and
execute them on their own writing intermediate results to disk, which is
working.

As for the stacktrace, I created some of them for the "big" workflow which
does not finish the optimization phase at all, here are three of them:
shortly after start: http://pastebin.com/i7SMGbxa
after ~32 minutes: http://pastebin.com/yJrYETxi
after ~76minutes: http://pastebin.com/fCsv8bie

I am no expert in analyzing these stacktraces, but perhaps it's helping in
some way for you!? ;)



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596p9177.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.