[DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

Robin Cassan
Hi all!

We are trying to build a Flink job that consumes a Kafka topic, groups
the incoming events in Session Windows according to a String that can
be generated by parsing the message (we call it `SessionKey`) and does
some processing on the windows before sending them to another Kafka
topic.
Our first implementation used a `keyBy` operator on the incoming
messages before creating the window, but we realized that we could
pre-partition our data by `SessionKey` when we insert it into the input
Kafka topic with a custom component. This would avoid having to
shuffle data around in Flink, since, for a given `SessionKey`, we would
ensure that all messages with this key will end-up in the same Kafka
partition and thus be read by the same subtask, on a single
TaskManager. This means that we should be able to create a keyed-stream
from the incoming data without having to transfer data between
TaskManagers.

To achieve that, we have used the `reinterpretAsKeyedStream` method
instead of the previous `keyBy`. This got rid of the shuffling step,
but we are wondering if this is the right way of using this feature and
whether Flink can manage to match the distribution of Keys from Kafka
with the ones assigned to each TaskManager?
We have observed that, while attempting to trigger a savepoint, we
would encounter exceptions that seem to indicate that the TaskManagers
received data whose `SessionKey` didn't match their assigned Keys.
Here is one of the stacktrace we saw while savepointing:

```
java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
```
We are currently using Flink 1.8.2 on Kubernetes, savepointing to
Amazon S3.

Is our observation about Flink not being able to match the Kafka
partitioning with the TaskManager's assigned KeyGroups correct?
And if so, do you have any pointers on how we could pre-partition our
data in Kafka so that Flink can avoid shuffling data before creating
the Session Windows?

Cheers,

Robin

--


Robin CASSAN

Data Engineer

+33 6 50 77 88 36
5 boulevard de la Madeleine - 75001 Paris

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

Congxian Qiu
Hi

From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment.
you should make sure that the key locates in the right key-group, and the key-group locates in the right parallelism. you can ref KeyGroupRangeAssignment[2] for more information.


Robin Cassan <[hidden email]> 于2019年11月30日周六 上午12:17写道:
Hi all!

We are trying to build a Flink job that consumes a Kafka topic, groups
the incoming events in Session Windows according to a String that can
be generated by parsing the message (we call it `SessionKey`) and does
some processing on the windows before sending them to another Kafka
topic.
Our first implementation used a `keyBy` operator on the incoming
messages before creating the window, but we realized that we could
pre-partition our data by `SessionKey` when we insert it into the input
Kafka topic with a custom component. This would avoid having to
shuffle data around in Flink, since, for a given `SessionKey`, we would
ensure that all messages with this key will end-up in the same Kafka
partition and thus be read by the same subtask, on a single
TaskManager. This means that we should be able to create a keyed-stream
from the incoming data without having to transfer data between
TaskManagers.

To achieve that, we have used the `reinterpretAsKeyedStream` method
instead of the previous `keyBy`. This got rid of the shuffling step,
but we are wondering if this is the right way of using this feature and
whether Flink can manage to match the distribution of Keys from Kafka
with the ones assigned to each TaskManager?
We have observed that, while attempting to trigger a savepoint, we
would encounter exceptions that seem to indicate that the TaskManagers
received data whose `SessionKey` didn't match their assigned Keys.
Here is one of the stacktrace we saw while savepointing:

```
java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
```
We are currently using Flink 1.8.2 on Kubernetes, savepointing to
Amazon S3.

Is our observation about Flink not being able to match the Kafka
partitioning with the TaskManager's assigned KeyGroups correct?
And if so, do you have any pointers on how we could pre-partition our
data in Kafka so that Flink can avoid shuffling data before creating
the Session Windows?

Cheers,

Robin

--


Robin CASSAN

Data Engineer

+33 6 50 77 88 36
5 boulevard de la Madeleine - 75001 Paris

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

Gyula Fóra
Hi!

As far as I know,  even if you prepartition the data exactly the same way in kafka using the key groups, you have  no guarantee that the kafka consumer source would pick up the right partitions.

Maybe if you have exactly as many kafka partitions as keygroups/max parallelism, partitioned correctly , but even then you might have to use a custom source to have the correct partition assignment for the sub tasks.

Long story short, I believe the built in Kafka source doesnt support what you want. But it should be possible to adapt it to do so.

Cheers
Gyula

On Mon, Dec 2, 2019, 03:49 Congxian Qiu <[hidden email]> wrote:
Hi

From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment.
you should make sure that the key locates in the right key-group, and the key-group locates in the right parallelism. you can ref KeyGroupRangeAssignment[2] for more information.


Robin Cassan <[hidden email]> 于2019年11月30日周六 上午12:17写道:
Hi all!

We are trying to build a Flink job that consumes a Kafka topic, groups
the incoming events in Session Windows according to a String that can
be generated by parsing the message (we call it `SessionKey`) and does
some processing on the windows before sending them to another Kafka
topic.
Our first implementation used a `keyBy` operator on the incoming
messages before creating the window, but we realized that we could
pre-partition our data by `SessionKey` when we insert it into the input
Kafka topic with a custom component. This would avoid having to
shuffle data around in Flink, since, for a given `SessionKey`, we would
ensure that all messages with this key will end-up in the same Kafka
partition and thus be read by the same subtask, on a single
TaskManager. This means that we should be able to create a keyed-stream
from the incoming data without having to transfer data between
TaskManagers.

To achieve that, we have used the `reinterpretAsKeyedStream` method
instead of the previous `keyBy`. This got rid of the shuffling step,
but we are wondering if this is the right way of using this feature and
whether Flink can manage to match the distribution of Keys from Kafka
with the ones assigned to each TaskManager?
We have observed that, while attempting to trigger a savepoint, we
would encounter exceptions that seem to indicate that the TaskManagers
received data whose `SessionKey` didn't match their assigned Keys.
Here is one of the stacktrace we saw while savepointing:

```
java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
```
We are currently using Flink 1.8.2 on Kubernetes, savepointing to
Amazon S3.

Is our observation about Flink not being able to match the Kafka
partitioning with the TaskManager's assigned KeyGroups correct?
And if so, do you have any pointers on how we could pre-partition our
data in Kafka so that Flink can avoid shuffling data before creating
the Session Windows?

Cheers,

Robin

--


Robin CASSAN

Data Engineer

+33 6 50 77 88 36
5 boulevard de la Madeleine - 75001 Paris

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

Robin Cassan
Thanks for your answers, we will have a look at adapting the Kafka source to assign the input partitions depending on the assigned Keygroups. If anyone has already done such a thing I'd love your advice!

Cheers

Robin

Le lun. 2 déc. 2019 à 08:48, Gyula Fóra <[hidden email]> a écrit :
Hi!

As far as I know,  even if you prepartition the data exactly the same way in kafka using the key groups, you have  no guarantee that the kafka consumer source would pick up the right partitions.

Maybe if you have exactly as many kafka partitions as keygroups/max parallelism, partitioned correctly , but even then you might have to use a custom source to have the correct partition assignment for the sub tasks.

Long story short, I believe the built in Kafka source doesnt support what you want. But it should be possible to adapt it to do so.

Cheers
Gyula

On Mon, Dec 2, 2019, 03:49 Congxian Qiu <[hidden email]> wrote:
Hi

From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment.
you should make sure that the key locates in the right key-group, and the key-group locates in the right parallelism. you can ref KeyGroupRangeAssignment[2] for more information.


Robin Cassan <[hidden email]> 于2019年11月30日周六 上午12:17写道:
Hi all!

We are trying to build a Flink job that consumes a Kafka topic, groups
the incoming events in Session Windows according to a String that can
be generated by parsing the message (we call it `SessionKey`) and does
some processing on the windows before sending them to another Kafka
topic.
Our first implementation used a `keyBy` operator on the incoming
messages before creating the window, but we realized that we could
pre-partition our data by `SessionKey` when we insert it into the input
Kafka topic with a custom component. This would avoid having to
shuffle data around in Flink, since, for a given `SessionKey`, we would
ensure that all messages with this key will end-up in the same Kafka
partition and thus be read by the same subtask, on a single
TaskManager. This means that we should be able to create a keyed-stream
from the incoming data without having to transfer data between
TaskManagers.

To achieve that, we have used the `reinterpretAsKeyedStream` method
instead of the previous `keyBy`. This got rid of the shuffling step,
but we are wondering if this is the right way of using this feature and
whether Flink can manage to match the distribution of Keys from Kafka
with the ones assigned to each TaskManager?
We have observed that, while attempting to trigger a savepoint, we
would encounter exceptions that seem to indicate that the TaskManagers
received data whose `SessionKey` didn't match their assigned Keys.
Here is one of the stacktrace we saw while savepointing:

```
java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
```
We are currently using Flink 1.8.2 on Kubernetes, savepointing to
Amazon S3.

Is our observation about Flink not being able to match the Kafka
partitioning with the TaskManager's assigned KeyGroups correct?
And if so, do you have any pointers on how we could pre-partition our
data in Kafka so that Flink can avoid shuffling data before creating
the Session Windows?

Cheers,

Robin

--


Robin CASSAN

Data Engineer

+33 6 50 77 88 36
5 boulevard de la Madeleine - 75001 Paris



--


Robin CASSAN

Data Engineer

+33 6 50 77 88 36
5 boulevard de la Madeleine - 75001 Paris