Understanding kafka watermark strategy assigner

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding kafka watermark strategy assigner

Nikola Hrusov
Hi,

In there, it is a given example where the watermark assigner is directly attached to the consumer like so (solution 1):

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
env.addSource(myConsumer)....

Then we can use that by adding it as a source and continue with the application.

My question is, would that have any/much difference against doing it after the source? Something like this (solution 2):
 
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
env 
.addSource(myConsumer) 
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));

I can eventually think that it would create an extra operator, but is there any other [unnecessary] overhead that solution 2 will give over solution 1?
I tried running a simple job, but I couldn't see much difference. I would like to know if there is something I am unaware of and I can do better.
 
Regards
,
Nikola Hrusov

Reply | Threaded
Open this post in threaded view
|

Re: Understanding kafka watermark strategy assigner

Kostas Kloudas-5
Hi Nikola,

Apart from the potential overhead you mentioned about having one more
operator, I cannot find any other. Also even this one I think is
negligible.
The reason why we recommend attaching the Watermark Generator to the
source is more about semantics rather than efficiency. It seems
natural for a pipeline whose logic depends on event time to have its
Watermarks generated at the source.

Cheers,
Kostas

On Sun, Nov 8, 2020 at 8:14 PM Nikola Hrusov <[hidden email]> wrote:

>
> Hi,
>
> I am reading about the watermark creation of the kafka streams using the article here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
> In there, it is a given example where the watermark assigner is directly attached to the consumer like so (solution 1):
>
>> FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
>> myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>> env.addSource(myConsumer)....
>
>
> Then we can use that by adding it as a source and continue with the application.
>
> My question is, would that have any/much difference against doing it after the source? Something like this (solution 2):
>
>>
>> FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
>> env
>>
>> .addSource(myConsumer)
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>
>
> I can eventually think that it would create an extra operator, but is there any other [unnecessary] overhead that solution 2 will give over solution 1?
> I tried running a simple job, but I couldn't see much difference. I would like to know if there is something I am unaware of and I can do better.
>
> Regards,
> Nikola Hrusov
>