Hi guys,
I have a question related to utilizing watermarks with multiple FlinkKakfkaConsumer082 instances. The aim is to have a global watermark across multiple kafka consumers where any message from any kafka partition would update the same watermark. When testing
a simple TimeStampExtractor implementation it seems each consumer results in a separate watermark. Is there a prescribed way of handling such a thing that anyone has any experience with?
Thanks for your help,
Andrew Griess
|
Hi Andrew, as far as I know, there is nothing such as a prescribed way of handling this kind of situation. If you want to synchronize the watermark generation given a set of KafkaConsumers you need some kind of ground truth. This could be, for example, a central registry such as ZooKeeper in which you collect the current watermarks of the different consumers. You could access ZooKeeper from inside the TimestampExtractor. Alternatively, however a bit more hacky, you could exploit that the consumer tasks are usually colocated with consumer tasks from different topics. This means that you'll have multiple subtasks reading from the different Kafka topics running in the same JVM. You could then use class variables to synchronize the watermarks. But this assumes that each subtask reading the topic t from Kafka is colocated with at least one other subtask reading the topic t' from Kafka with t' in T \ {t} and T being the set of Kafka topics. Per default this should be the case. I'm wondering why you need a global watermark for you Kafka topics. Isn't it enough that you have individual watermarks for each topic? Cheers, Till On Tue, Dec 15, 2015 at 4:45 PM, Griess, Andrew <[hidden email]> wrote:
|
@Andrew: Just to make sure that there is no confusion: Even though every Kafka Source generates a local watermark, these watermarks are merged when the streams are merged, for example in union() or keyBy() steps. The operator that merges streams tracks the current watermark of each stream and then emits according watermarks. That way, most cases do not need and global watermark coordination. Here is an example: DataStream<X> kafka1 = env.addSource(new KafakConsumer("topicA", ...)).setParallelism(2); // assume this has 2 kafka partitions DataStream<X> kafka2 = env.addSource(new KafakConsumer("topicB", ...)).setParallelism(1); // asume this has 1 kafka partition kafka1.keyBy(...).map(...).union(kafka2); Now assume this: - Kafka source 1 (subtask 1) emits watermark 17 - Kafka source 1 (subtask 2) emits watermark 21 - Kafka source 2 (subtask 1) emits watermark 5 - The map() after the keyBy receives the watermarks from the two subtasks of kafka1 and has its watermark at 17 (the min) - The union() has the watermark at 5 - Kafka source 1 (subtask 1) emits watermark 24 - Kafka source 1 (subtask 2) emits watermark 22 - Kafka source 2 (subtask 1) emits watermark 27 - The map() after the keyBy now has the watermark at 22 - The union has the watermark at 22 as well now Hope that illustrates Flink's mechanism a bit. Do you think this mechanism handle your watermark coordination? Greetings, Stephan On Wed, Dec 16, 2015 at 11:06 AM, Till Rohrmann <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |