Hello Flink community,
I created a slightly long batch workflow for my use case of clustering vertices using Flink and Gelly. Executing each of the workflow parts individually (and write intermediate results to disk) works as suspected. When combining workflow parts to longer jobs, I noticed that the time between 'Job Name' time and the actual 'Start Time' in the Flink Dashboard differ. With longer workflow chains the time difference gets bigger and bigger. At this point, I think that Flink is creating the execution plan which is executed directly afterwards. As an example (90% of the workflow combined), I 'wait' for the execution plan for 77-78 seconds, then the job is accepted for execution and needs another 7-9 seconds to process a small test dataset (<8k vertices with property values and edges) - each run repeated 3 times. If running only env.getExecutionPlan() I will wait similar time for the execution plan. I added the JSON execution plan to this post. For bigger datasets the execution plan creation time and the job execution time grows as well in my scenario. When I now add a vertex centric iteration to my workflow and start the Flink job, I don't get a result at all: I stopped the job (print execution plan to log) at the following point: - waited > 20 hours after 'flink run ...' - two cores on my machine are at 100% all the time working on the flink job - no entry in Flink dashboard at all - no entry in log file after these lines: ### [...] org.apache.flink.client.CliFrontend - Starting execution of program org.apache.flink.client.program.Client - Starting program in interactive mode org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers org.apache.flink.optimizer.Optimizer - The parallelism of nested dataflows (such as step functions in iterations) is currently fixed to the parallelism of the surrounding operator (the iteration). ### Most likely the workflow could be optimized in many ways to need less time at certain points (yes, I am not a Flink expert in many places), but I think that long/complex workflows would still suffer of problems like this. Due to the fact that every single step is producing output (and some combined parts of the workflow do so, too), I currently suspect the Flink optimizer / execution plan creation to be the problem and therefore ask anyone here if you have experience with similar behavior. Any suggestions how I could successfully run long/complex workflows not running in such problems? ;) If there is not (an instant) 'solution' to the problem I would be still interested in opinions and ideas, thanks in advance! Best, Markus complexExecPlan.json |
Hi Markus, you might be right, that a lot of time is spend in optimization. The optimizer enumerates all alternatives and chooses the plan with the least estimated cost. The degrees of freedom of the optimizer are rather restricted (execution strategies and the used partitioning & sorting keys. The order of operators is not optimized). However, your plan contains of more than 250 operators which is pretty large (in fact, I haven't seen a Flink plan of this size yet). I assume that this is only one part of the program that exceeded the 20 minutes of optimization, correct? In order to verify that Flink is stuck in the optimization phase, you could take a stacktrace to see which methods the Java process currently executes. One way to improve the optimization time is to set a few JoinHints to reduce the degrees of freedom and number of enumerated plans. Hope this helps, Fabian 2016-08-22 13:50 GMT+02:00 Markus Nentwig <[hidden email]>: Hello Flink community, |
Hi Fabian,
at first, sorry for the late answer. The given execution plan was created after 20 minutes, only one vertex centric iteration is missing. I can optimize the program because some operators are only needed to create intermediate debug results, still, it's not enough to run as one Flink job. My "solution" is currently that I split the program in several parts and execute them on their own writing intermediate results to disk, which is working. As for the stacktrace, I created some of them for the "big" workflow which does not finish the optimization phase at all, here are three of them: shortly after start: http://pastebin.com/i7SMGbxa after ~32 minutes: http://pastebin.com/yJrYETxi after ~76minutes: http://pastebin.com/fCsv8bie I am no expert in analyzing these stacktraces, but perhaps it's helping in some way for you!? ;) |
Hi Markus, thanks for the stacktraces!I create the JIRA issue FLINK-4688 [1] to track this issue. Fabian [1] https://issues.apache.org/jira/browse/FLINK-4688 2016-09-26 17:58 GMT+02:00 Markus Nentwig <[hidden email]>: Hi Fabian, |
Free forum by Nabble | Edit this page |