Flink Cross Strategies

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Cross Strategies

gen-too
Hi,

I would like to knot how the Flink cross function works. I found that
there are four strategies ( NESTEDLOOP_BLOCKED_OUTER_FIRST,
NESTEDLOOP_BLOCKED_OUTER_SECOND, NESTEDLOOP_STREAMED_OUTER_FIRST,
NESTEDLOOP_STREAMED_OUTER_SECOND), but I need some more detailed
explanations please.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Flink Cross Strategies

Fabian Hueske-2
Hi,

first of all, Cross is a *very* expensive operation, if you cannot ensure that one side is very small. If one input fits into memory, it is usually better to use a MapFunction with a broadcast set. If both sides can be large, Cross will take a very long time.
That being said, the strategies work as follow:
- NESTEDLOOP_STREAMED_OUTER_FIRST: The first (or left) input is the the outer side of a nested-loop. The second (or right) input is buffered (potentially spilled to disk). For each record of the outer input, we read and combine all values of the spilled inner with the outer record. Hence, the order of the outer side is preserved.
- NESTEDLOOP_BLOCKED_OUTER_FIRST: first (or left) input is the the outer side of a nested-loop. The second (or right) input is buffered (potentially spilled to disk). The outer side is consumed in blocks of records. For each block of outer records, the inner side is read and each record of the inner side is combined with all outer records in the block. This strategy destroys the sort order of the outer side.

The other strategies switch outer and inner side and are symmetric.

The benefit of the blocked strategy is that we only iterate once per block over the inner side and not for each individual records as the streamed strategy does. However, the blocked variant destroys the order of the outer side.

Best, Fabian

2017-04-04 11:21 GMT+02:00 gen-too <[hidden email]>:
Hi,

I would like to knot how the Flink cross function works. I found that there are four strategies ( NESTEDLOOP_BLOCKED_OUTER_FIRST, NESTEDLOOP_BLOCKED_OUTER_SECOND, NESTEDLOOP_STREAMED_OUTER_FIRST, NESTEDLOOP_STREAMED_OUTER_SECOND), but I need some more detailed explanations please.

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Flink Cross Strategies

gen-too
Hi Fabian,

thanks for your answer!

How does Flink select the cross strategy?

For a better understanding for my part maybe we can assume the following example scenario: I have two DataSets consisting of 6000 and 4000 records (stored as files in HDFS) and I want to do the cross operation. Lets say that I have a parallelism of 2, then how is the general work flow?


Thanks in advance


Am 05.04.2017 um 09:56 schrieb Fabian Hueske:
Hi,

first of all, Cross is a *very* expensive operation, if you cannot ensure that one side is very small. If one input fits into memory, it is usually better to use a MapFunction with a broadcast set. If both sides can be large, Cross will take a very long time.
That being said, the strategies work as follow:
- NESTEDLOOP_STREAMED_OUTER_FIRST: The first (or left) input is the the outer side of a nested-loop. The second (or right) input is buffered (potentially spilled to disk). For each record of the outer input, we read and combine all values of the spilled inner with the outer record. Hence, the order of the outer side is preserved.
- NESTEDLOOP_BLOCKED_OUTER_FIRST: first (or left) input is the the outer side of a nested-loop. The second (or right) input is buffered (potentially spilled to disk). The outer side is consumed in blocks of records. For each block of outer records, the inner side is read and each record of the inner side is combined with all outer records in the block. This strategy destroys the sort order of the outer side.

The other strategies switch outer and inner side and are symmetric.

The benefit of the blocked strategy is that we only iterate once per block over the inner side and not for each individual records as the streamed strategy does. However, the blocked variant destroys the order of the outer side.

Best, Fabian

2017-04-04 11:21 GMT+02:00 gen-too <[hidden email]>:
Hi,

I would like to knot how the Flink cross function works. I found that there are four strategies ( NESTEDLOOP_BLOCKED_OUTER_FIRST, NESTEDLOOP_BLOCKED_OUTER_SECOND, NESTEDLOOP_STREAMED_OUTER_FIRST, NESTEDLOOP_STREAMED_OUTER_SECOND), but I need some more detailed explanations please.

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Flink Cross Strategies

Fabian Hueske-2
The DataSet API has three methods:
- cross()
- crossWithTiny()
- crossWithHuge()

The latter two can be used to give the optimizer a hint about the size of the inputs.
The optimizer will broadcast the smaller input to each parallel task and perform a local cross join.
The local join will be either streamed with the smaller input being on the inner side or blocked with the smaller input being the outer side.

Whether the streamed or blocked strategy is chosen depends on whether the plan can leverage the preserved sort order of the outer side.

Best, Fabian

2017-04-05 11:17 GMT+02:00 gen-too <[hidden email]>:
Hi Fabian,

thanks for your answer!

How does Flink select the cross strategy?

For a better understanding for my part maybe we can assume the following example scenario: I have two DataSets consisting of 6000 and 4000 records (stored as files in HDFS) and I want to do the cross operation. Lets say that I have a parallelism of 2, then how is the general work flow?


Thanks in advance



Am 05.04.2017 um 09:56 schrieb Fabian Hueske:
Hi,

first of all, Cross is a *very* expensive operation, if you cannot ensure that one side is very small. If one input fits into memory, it is usually better to use a MapFunction with a broadcast set. If both sides can be large, Cross will take a very long time.
That being said, the strategies work as follow:
- NESTEDLOOP_STREAMED_OUTER_FIRST: The first (or left) input is the the outer side of a nested-loop. The second (or right) input is buffered (potentially spilled to disk). For each record of the outer input, we read and combine all values of the spilled inner with the outer record. Hence, the order of the outer side is preserved.
- NESTEDLOOP_BLOCKED_OUTER_FIRST: first (or left) input is the the outer side of a nested-loop. The second (or right) input is buffered (potentially spilled to disk). The outer side is consumed in blocks of records. For each block of outer records, the inner side is read and each record of the inner side is combined with all outer records in the block. This strategy destroys the sort order of the outer side.

The other strategies switch outer and inner side and are symmetric.

The benefit of the blocked strategy is that we only iterate once per block over the inner side and not for each individual records as the streamed strategy does. However, the blocked variant destroys the order of the outer side.

Best, Fabian

2017-04-04 11:21 GMT+02:00 gen-too <[hidden email]>:
Hi,

I would like to knot how the Flink cross function works. I found that there are four strategies ( NESTEDLOOP_BLOCKED_OUTER_FIRST, NESTEDLOOP_BLOCKED_OUTER_SECOND, NESTEDLOOP_STREAMED_OUTER_FIRST, NESTEDLOOP_STREAMED_OUTER_SECOND), but I need some more detailed explanations please.

Thanks