Hi everyone,
I have a basic question regarding scheduling of batch programs. Let's take the following graph: -> Group Combine -> ... / Source ----> Group Combine -> ... \ -> Map -> ... So, a source and followed by three operators with ship strategy "Forward" and exchange mode "pipelined". The three flows are later joined again, so that this results in a single job. When the job is started, first, only one of the operators immediately receive the input read by the source and can therefore run concurrently with the source. Once the source is finished, the other two operators are scheduled. Two questions about this: 1) Why doesn't the source forward the records to all three operators while still running? 2) How does the jobmanager decide, which of the three operators receivese the pipelined data first? Cheers and Thanks, Konstantin -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc (849 bytes) Download Attachment |
Hi Konstantin, the DataSet API tries to execute all operators as soon as possible. I assume that in your case, Flink does not do this because it tries to avoid a deadlock. A dataflow which replicates data from the same source and joins it again might get deadlocked because all pipelines need to make progress in order to finish the source. Think of a simple example like this: /-- Map1 --\ Src --< >-Join \-- Map2 --/ If the join is executed as a hash join, one input (Map1) is used to build a hash table. Only once the hash table is built, the other input (Map2) can be consumed. If both Map operators would run at the same time, Map2 would stall at some point because it cannot emit anymore data due to the backpressure of the not-yet-opened probe input of the hash join. Once Map2 stalls, the Source would stall and Map1 could not continue to finish the build side. At this point we have a deadlock. Flink detects these situations and adds an artificial pipeline breaker in the dataflow to prevent deadlocks. Due to the pipeline breaker, the build side is completed before the probe side input is processed. This also answers the question, which operator is executed first: the operator on the build side of the first join. Hence the join strategy of the optimizer (BUILD_FIRST, BUILD_SECONS) decides. You can also give a manual JoinHint to control that. If you give a SORT_MERGE hint, all three operators should run concurrently because both join input will be concurrently consumed for sorting. Best, Fabian 2017-01-04 13:30 GMT+01:00 Konstantin Knauf <[hidden email]>: Hi everyone, |
Hi Fabian,
I see, thank's for the quick explanation. Cheers, Konstantin On 04.01.2017 14:15, Fabian Hueske wrote: > Hi Konstantin, > > the DataSet API tries to execute all operators as soon as possible. > > I assume that in your case, Flink does not do this because it tries to > avoid a deadlock. > A dataflow which replicates data from the same source and joins it again > might get deadlocked because all pipelines need to make progress in > order to finish the source. > > Think of a simple example like this: > > /-- Map1 --\ > Src --< >-Join > \-- Map2 --/ > > If the join is executed as a hash join, one input (Map1) is used to > build a hash table. Only once the hash table is built, the other input > (Map2) can be consumed. > If both Map operators would run at the same time, Map2 would stall at > some point because it cannot emit anymore data due to the backpressure > of the not-yet-opened probe input of the hash join. > Once Map2 stalls, the Source would stall and Map1 could not continue to > finish the build side. At this point we have a deadlock. > > Flink detects these situations and adds an artificial pipeline breaker > in the dataflow to prevent deadlocks. Due to the pipeline breaker, the > build side is completed before the probe side input is processed. > > This also answers the question, which operator is executed first: the > operator on the build side of the first join. Hence the join strategy of > the optimizer (BUILD_FIRST, BUILD_SECONS) decides. > You can also give a manual JoinHint to control that. If you give a > SORT_MERGE hint, all three operators should run concurrently because > both join input will be concurrently consumed for sorting. > > Best, Fabian > > > 2017-01-04 13:30 GMT+01:00 Konstantin Knauf > <[hidden email] <mailto:[hidden email]>>: > > Hi everyone, > > I have a basic question regarding scheduling of batch programs. Let's > take the following graph: > > -> Group Combine -> ... > / > Source ----> Group Combine -> ... > \ > -> Map -> ... > > So, a source and followed by three operators with ship strategy > "Forward" and exchange mode "pipelined". > > The three flows are later joined again, so that this results in a single > job. > > When the job is started, first, only one of the operators immediately > receive the input read by the source and can therefore run concurrently > with the source. Once the source is finished, the other two operators > are scheduled. > > Two questions about this: > > 1) Why doesn't the source forward the records to all three operators > while still running? > 2) How does the jobmanager decide, which of the three operators > receivese the pipelined data first? > > Cheers and Thanks, > > Konstantin > > > -- > Konstantin Knauf * [hidden email] > <mailto:[hidden email]> * +49-174-3413182 > <tel:%2B49-174-3413182> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |