Hi all!
We are trying to build a Flink job that consumes a Kafka topic, groups the incoming events in Session Windows according to a String that can be generated by parsing the message (we call it `SessionKey`) and does some processing on the windows before sending them to another Kafka topic. Our first implementation used a `keyBy` operator on the incoming messages before creating the window, but we realized that we could pre-partition our data by `SessionKey` when we insert it into the input Kafka topic with a custom component. This would avoid having to shuffle data around in Flink, since, for a given `SessionKey`, we would ensure that all messages with this key will end-up in the same Kafka partition and thus be read by the same subtask, on a single TaskManager. This means that we should be able to create a keyed-stream from the incoming data without having to transfer data between TaskManagers. To achieve that, we have used the `reinterpretAsKeyedStream` method instead of the previous `keyBy`. This got rid of the shuffling step, but we are wondering if this is the right way of using this feature and whether Flink can manage to match the distribution of Keys from Kafka with the ones assigned to each TaskManager? We have observed that, while attempting to trigger a savepoint, we would encounter exceptions that seem to indicate that the TaskManagers received data whose `SessionKey` didn't match their assigned Keys. Here is one of the stacktrace we saw while savepointing: ``` java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392) ``` We are currently using Flink 1.8.2 on Kubernetes, savepointing to Amazon S3. Is our observation about Flink not being able to match the Kafka partitioning with the TaskManager's assigned KeyGroups correct? And if so, do you have any pointers on how we could pre-partition our data in Kafka so that Flink can avoid shuffling data before creating the Session Windows? Cheers, Robin |
Hi From the doc[1], the DataStream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment. you should make sure that the key locates in the right key-group, and the key-group locates in the right parallelism. you can ref KeyGroupRangeAssignment[2] for more information. Robin Cassan <[hidden email]> 于2019年11月30日周六 上午12:17写道:
|
Hi! As far as I know, even if you prepartition the data exactly the same way in kafka using the key groups, you have no guarantee that the kafka consumer source would pick up the right partitions. Maybe if you have exactly as many kafka partitions as keygroups/max parallelism, partitioned correctly , but even then you might have to use a custom source to have the correct partition assignment for the sub tasks. Long story short, I believe the built in Kafka source doesnt support what you want. But it should be possible to adapt it to do so. Cheers Gyula On Mon, Dec 2, 2019, 03:49 Congxian Qiu <[hidden email]> wrote:
|
Thanks for your answers, we will have a look at adapting the Kafka source to assign the input partitions depending on the assigned Keygroups. If anyone has already done such a thing I'd love your advice! Cheers Robin Le lun. 2 déc. 2019 à 08:48, Gyula Fóra <[hidden email]> a écrit :
|
Free forum by Nabble | Edit this page |