Hi, I am reading about the watermark creation of the kafka streams using the article here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission In there, it is a given example where the watermark assigner is directly attached to the consumer like so (solution 1): FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); Then we can use that by adding it as a source and continue with the application. My question is, would that have any/much difference against doing it after the source? Something like this (solution 2): FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); .addSource(myConsumer) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))); I can eventually think that it would create an extra operator, but is there any other [unnecessary] overhead that solution 2 will give over solution 1? I tried running a simple job, but I couldn't see much difference. I would like to know if there is something I am unaware of and I can do better. Regards ,Nikola Hrusov |
Hi Nikola,
Apart from the potential overhead you mentioned about having one more operator, I cannot find any other. Also even this one I think is negligible. The reason why we recommend attaching the Watermark Generator to the source is more about semantics rather than efficiency. It seems natural for a pipeline whose logic depends on event time to have its Watermarks generated at the source. Cheers, Kostas On Sun, Nov 8, 2020 at 8:14 PM Nikola Hrusov <[hidden email]> wrote: > > Hi, > > I am reading about the watermark creation of the kafka streams using the article here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > > In there, it is a given example where the watermark assigner is directly attached to the consumer like so (solution 1): > >> FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); >> myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))); >> env.addSource(myConsumer).... > > > Then we can use that by adding it as a source and continue with the application. > > My question is, would that have any/much difference against doing it after the source? Something like this (solution 2): > >> >> FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); >> env >> >> .addSource(myConsumer) >> >> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))); > > > I can eventually think that it would create an extra operator, but is there any other [unnecessary] overhead that solution 2 will give over solution 1? > I tried running a simple job, but I couldn't see much difference. I would like to know if there is something I am unaware of and I can do better. > > Regards, > Nikola Hrusov > |
Free forum by Nabble | Edit this page |