What is the best way to handle data skew processing in Data Stream applications?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

What is the best way to handle data skew processing in Data Stream applications?

Felipe Gutierrez
Hi,

I am studying data skew processing in Flink and how I can change the low-level control of physical partition (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning) in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

According to the Flink dashboard I could not see too much difference among .shuffle(), .rescale(), and .rebalance(). Even though the documentation says rebalance() transformation is more suitable for data skew.

After that I tried to use .partitionCustom(partitioner, "someKey"). However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says "Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.". I did not understand why. If I am allowed to do partitionCustom why can't I use parallelism after that?

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to handle data skew processing in Data Stream applications?

Fabian Hueske-2
Hi Felipe,

three comments:

1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no effect:
keyBy() introduces a hash partitioning such that any data partitioning that you do immediately before keyBy() is destroyed.
You only change the distribution for the call of the key extractor which should be a lightweight function anyway.
That's why you do not see any difference between the three methods.

2) windowAll() defines a non-keyed window over the whole stream.
All records are processed by the same non-parallel instance of the window operator.
That's why assigning a higher parallelism to that operator does not help.

3) One approach to improve the processing of skewed data, is to change how keyed state is handled.
Flink's keyed state is partitioned in two steps:
1. each key is assigned to a key group based on an internal hash function.
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both. 
Changing 1) is tricky because it affects savepoint compatibility.
Changing 2) does not help if two hot keys are assigned to the same keyed state.

Best, Fabian

Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <[hidden email]>:
Hi,

I am studying data skew processing in Flink and how I can change the low-level control of physical partition (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning) in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

According to the Flink dashboard I could not see too much difference among .shuffle(), .rescale(), and .rebalance(). Even though the documentation says rebalance() transformation is more suitable for data skew.

After that I tried to use .partitionCustom(partitioner, "someKey"). However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says "Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.". I did not understand why. If I am allowed to do partitionCustom why can't I use parallelism after that?

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to handle data skew processing in Data Stream applications?

Till Rohrmann
Just a small addition:

If two hot keys fall into two key groups which are being processed by the same TM, then it could help to change the parallelism, because then the key group mapping might be different.

If two hot keys fall into the same key group, you can adjust the max parallelism which defines how many key groups will be used. By changing the number, it might happen that the two hot keys fall into different key groups.

Cheers,
Till

On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske <[hidden email]> wrote:
Hi Felipe,

three comments:

1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no effect:
keyBy() introduces a hash partitioning such that any data partitioning that you do immediately before keyBy() is destroyed.
You only change the distribution for the call of the key extractor which should be a lightweight function anyway.
That's why you do not see any difference between the three methods.

2) windowAll() defines a non-keyed window over the whole stream.
All records are processed by the same non-parallel instance of the window operator.
That's why assigning a higher parallelism to that operator does not help.

3) One approach to improve the processing of skewed data, is to change how keyed state is handled.
Flink's keyed state is partitioned in two steps:
1. each key is assigned to a key group based on an internal hash function.
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both. 
Changing 1) is tricky because it affects savepoint compatibility.
Changing 2) does not help if two hot keys are assigned to the same keyed state.

Best, Fabian

Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <[hidden email]>:
Hi,

I am studying data skew processing in Flink and how I can change the low-level control of physical partition (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning) in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

According to the Flink dashboard I could not see too much difference among .shuffle(), .rescale(), and .rebalance(). Even though the documentation says rebalance() transformation is more suitable for data skew.

After that I tried to use .partitionCustom(partitioner, "someKey"). However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says "Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.". I did not understand why. If I am allowed to do partitionCustom why can't I use parallelism after that?

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to handle data skew processing in Data Stream applications?

Felipe Gutierrez
thanks All for your suggestions!

I am not sure if the option 3 that Fabian said I will need to change the Flink source code or it can be implemented on top of Flink.
-------------------------
3) One approach to improve the processing of skewed data, is to change how keyed state is handled. 
Flink's keyed state is partitioned in two steps: 
1. each key is assigned to a key group based on an internal hash function. 
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both. 
Changing 1) is tricky because it affects savepoint compatibility. 
Changing 2) does not help if two hot keys are assigned to the same keyed state.
-------------------------
I did an experiment with a Mapper function that maps to a key with one more parameter (a skew parameter). The results are better.

Integer skewParameter = 0;
if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) { // this is the skewed key
skewParameter = this.skewParameterGenerator.getNextItem();
}
CompositeSkewedKeyStationPlatform compositeKey = new CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter);

But it is still a static solution =(. I mean, the developer has to set on the Mapper which key is skewed.

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Apr 11, 2019 at 1:49 PM Till Rohrmann <[hidden email]> wrote:
Just a small addition:

If two hot keys fall into two key groups which are being processed by the same TM, then it could help to change the parallelism, because then the key group mapping might be different.

If two hot keys fall into the same key group, you can adjust the max parallelism which defines how many key groups will be used. By changing the number, it might happen that the two hot keys fall into different key groups.

Cheers,
Till

On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske <[hidden email]> wrote:
Hi Felipe,

three comments:

1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no effect:
keyBy() introduces a hash partitioning such that any data partitioning that you do immediately before keyBy() is destroyed.
You only change the distribution for the call of the key extractor which should be a lightweight function anyway.
That's why you do not see any difference between the three methods.

2) windowAll() defines a non-keyed window over the whole stream.
All records are processed by the same non-parallel instance of the window operator.
That's why assigning a higher parallelism to that operator does not help.

3) One approach to improve the processing of skewed data, is to change how keyed state is handled.
Flink's keyed state is partitioned in two steps:
1. each key is assigned to a key group based on an internal hash function.
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both. 
Changing 1) is tricky because it affects savepoint compatibility.
Changing 2) does not help if two hot keys are assigned to the same keyed state.

Best, Fabian

Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <[hidden email]>:
Hi,

I am studying data skew processing in Flink and how I can change the low-level control of physical partition (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning) in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

According to the Flink dashboard I could not see too much difference among .shuffle(), .rescale(), and .rebalance(). Even though the documentation says rebalance() transformation is more suitable for data skew.

After that I tried to use .partitionCustom(partitioner, "someKey"). However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says "Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.". I did not understand why. If I am allowed to do partitionCustom why can't I use parallelism after that?

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to handle data skew processing in Data Stream applications?

Felipe Gutierrez
I guess I could implement a solution which is not static and extends the OneInputStreamOperator Flink operator. 

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Apr 11, 2019 at 2:21 PM Felipe Gutierrez <[hidden email]> wrote:
thanks All for your suggestions!

I am not sure if the option 3 that Fabian said I will need to change the Flink source code or it can be implemented on top of Flink.
-------------------------
3) One approach to improve the processing of skewed data, is to change how keyed state is handled. 
Flink's keyed state is partitioned in two steps: 
1. each key is assigned to a key group based on an internal hash function. 
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both. 
Changing 1) is tricky because it affects savepoint compatibility. 
Changing 2) does not help if two hot keys are assigned to the same keyed state.
-------------------------
I did an experiment with a Mapper function that maps to a key with one more parameter (a skew parameter). The results are better.

Integer skewParameter = 0;
if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) { // this is the skewed key
skewParameter = this.skewParameterGenerator.getNextItem();
}
CompositeSkewedKeyStationPlatform compositeKey = new CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter);

But it is still a static solution =(. I mean, the developer has to set on the Mapper which key is skewed.

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Thu, Apr 11, 2019 at 1:49 PM Till Rohrmann <[hidden email]> wrote:
Just a small addition:

If two hot keys fall into two key groups which are being processed by the same TM, then it could help to change the parallelism, because then the key group mapping might be different.

If two hot keys fall into the same key group, you can adjust the max parallelism which defines how many key groups will be used. By changing the number, it might happen that the two hot keys fall into different key groups.

Cheers,
Till

On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske <[hidden email]> wrote:
Hi Felipe,

three comments:

1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no effect:
keyBy() introduces a hash partitioning such that any data partitioning that you do immediately before keyBy() is destroyed.
You only change the distribution for the call of the key extractor which should be a lightweight function anyway.
That's why you do not see any difference between the three methods.

2) windowAll() defines a non-keyed window over the whole stream.
All records are processed by the same non-parallel instance of the window operator.
That's why assigning a higher parallelism to that operator does not help.

3) One approach to improve the processing of skewed data, is to change how keyed state is handled.
Flink's keyed state is partitioned in two steps:
1. each key is assigned to a key group based on an internal hash function.
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both. 
Changing 1) is tricky because it affects savepoint compatibility.
Changing 2) does not help if two hot keys are assigned to the same keyed state.

Best, Fabian

Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <[hidden email]>:
Hi,

I am studying data skew processing in Flink and how I can change the low-level control of physical partition (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning) in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

According to the Flink dashboard I could not see too much difference among .shuffle(), .rescale(), and .rebalance(). Even though the documentation says rebalance() transformation is more suitable for data skew.

After that I tried to use .partitionCustom(partitioner, "someKey"). However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says "Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.". I did not understand why. If I am allowed to do partitionCustom why can't I use parallelism after that?

streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez