Kafka 0.10.x event time with windowing

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka 0.10.x event time with windowing

Jia Teoh
Hi,

I'm trying to use KafkaConsumer010 as a source for a windowing job on event time, as provided by Kafka. According to the kafka connector doc (link), I've set the time characteristic to event time (streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)) and am using KafkaConsumer010 along with Kafka 0.10.2. I've also set up windowing: "stream.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowMillis)))" (using timeWindowAll appears to be equivalent as well).

Using these configurations I can verify that data is read from Kafka. However, the event time windows never trigger even when data is loaded for much longer than the window size. Is there an additional configuration I am missing? 

I have verified that the Kafka messages have timestamps. The docs mention that there is no need for a timestamp extractor, but using one to explicitly assign the current time does result in windows being triggered.

Thanks,
Jia Teoh
Reply | Threaded
Open this post in threaded view
|

Re: Kafka 0.10.x event time with windowing

Tzu-Li (Gordon) Tai
Hi Jia!

This sounds a bit fishy. The docs mention that there is no need for a timestamp / watermark extractor because with 0.10, the timestamps that come with Kafka records can be used directly to produce watermarks for event time.

One quick clarification: did you also check whether the timestamps that come with the Kafka 0.10 records are sound and reasonable?

Cheers,
Gordon

On 17 May 2017 at 4:45:19 AM, Jia Teoh ([hidden email]) wrote:

Hi,

I'm trying to use KafkaConsumer010 as a source for a windowing job on event time, as provided by Kafka. According to the kafka connector doc (link), I've set the time characteristic to event time (streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)) and am using KafkaConsumer010 along with Kafka 0.10.2. I've also set up windowing: "stream.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowMillis)))" (using timeWindowAll appears to be equivalent as well).

Using these configurations I can verify that data is read from Kafka. However, the event time windows never trigger even when data is loaded for much longer than the window size. Is there an additional configuration I am missing? 

I have verified that the Kafka messages have timestamps. The docs mention that there is no need for a timestamp extractor, but using one to explicitly assign the current time does result in windows being triggered.

Thanks,
Jia Teoh
Reply | Threaded
Open this post in threaded view
|

Re: Kafka 0.10.x event time with windowing

Jia Teoh
Hi Gordon,

Thanks for confirming my understanding that the extractor should not have to be defined for 0.10. However, I'm still experiencing the case where not using an extractor results in zero window triggers.

I've verified the timestamps in the Kafka records with the following command:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning --property print.timestamp=true
Where the 'input' topic happens to consist of Strings representing the time the record was created. I get output such as the following:
CreateTime:1494998828813        1494998828813
CreateTime:1494998828901        1494998828901
CreateTime:1494998828914        1494998828914
CreateTime:1494998828915        1494998828914
CreateTime:1494998828915        1494998828915
CreateTime:1494998829004        1494998829003
CreateTime:1494998829016        1494998829016
CreateTime:1494998829016        1494998829016
where CreateTime is the timestamp generated by Kafka and the second value is the record value. In this particular case that happens to be the time the record was created in the producer, which resides on the same machine as the Kafka broker (hence the identical values).

-Jia

On Tue, May 16, 2017 at 9:30 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Jia!

This sounds a bit fishy. The docs mention that there is no need for a timestamp / watermark extractor because with 0.10, the timestamps that come with Kafka records can be used directly to produce watermarks for event time.

One quick clarification: did you also check whether the timestamps that come with the Kafka 0.10 records are sound and reasonable?

Cheers,
Gordon


On 17 May 2017 at 4:45:19 AM, Jia Teoh ([hidden email]) wrote:

Hi,

I'm trying to use KafkaConsumer010 as a source for a windowing job on event time, as provided by Kafka. According to the kafka connector doc (link), I've set the time characteristic to event time (streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)) and am using KafkaConsumer010 along with Kafka 0.10.2. I've also set up windowing: "stream.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowMillis)))" (using timeWindowAll appears to be equivalent as well).

Using these configurations I can verify that data is read from Kafka. However, the event time windows never trigger even when data is loaded for much longer than the window size. Is there an additional configuration I am missing? 

I have verified that the Kafka messages have timestamps. The docs mention that there is no need for a timestamp extractor, but using one to explicitly assign the current time does result in windows being triggered.

Thanks,
Jia Teoh

Reply | Threaded
Open this post in threaded view
|

Re: Kafka 0.10.x event time with windowing

Tzu-Li (Gordon) Tai
Ah, my apologies, some misunderstanding on my side here.

FlinkKafkaConsumer010 attaches the Kafka timestamp with the records, hence a timestamp extractor is not required, BUT you’ll still need a watermark generator to produce the watermarks. That should explain why the windows aren’t firing.

I agree its a bit confusing for the Kafka010 consumer at the moment because timestamp extraction and watermark generating is bound to a single interface. I think there are plans to separate that in the future.

Cheers,
Gordon

On 17 May 2017 at 1:34:31 PM, Jia Teoh ([hidden email]) wrote:

Hi Gordon,

Thanks for confirming my understanding that the extractor should not have to be defined for 0.10. However, I'm still experiencing the case where not using an extractor results in zero window triggers.

I've verified the timestamps in the Kafka records with the following command:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning --property print.timestamp=true
Where the 'input' topic happens to consist of Strings representing the time the record was created. I get output such as the following:
CreateTime:1494998828813        1494998828813
CreateTime:1494998828901        1494998828901
CreateTime:1494998828914        1494998828914
CreateTime:1494998828915        1494998828914
CreateTime:1494998828915        1494998828915
CreateTime:1494998829004        1494998829003
CreateTime:1494998829016        1494998829016
CreateTime:1494998829016        1494998829016
where CreateTime is the timestamp generated by Kafka and the second value is the record value. In this particular case that happens to be the time the record was created in the producer, which resides on the same machine as the Kafka broker (hence the identical values).

-Jia

On Tue, May 16, 2017 at 9:30 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Jia!

This sounds a bit fishy. The docs mention that there is no need for a timestamp / watermark extractor because with 0.10, the timestamps that come with Kafka records can be used directly to produce watermarks for event time.

One quick clarification: did you also check whether the timestamps that come with the Kafka 0.10 records are sound and reasonable?

Cheers,
Gordon


On 17 May 2017 at 4:45:19 AM, Jia Teoh ([hidden email]) wrote:

Hi,

I'm trying to use KafkaConsumer010 as a source for a windowing job on event time, as provided by Kafka. According to the kafka connector doc (link), I've set the time characteristic to event time (streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)) and am using KafkaConsumer010 along with Kafka 0.10.2. I've also set up windowing: "stream.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowMillis)))" (using timeWindowAll appears to be equivalent as well).

Using these configurations I can verify that data is read from Kafka. However, the event time windows never trigger even when data is loaded for much longer than the window size. Is there an additional configuration I am missing? 

I have verified that the Kafka messages have timestamps. The docs mention that there is no need for a timestamp extractor, but using one to explicitly assign the current time does result in windows being triggered.

Thanks,
Jia Teoh

Reply | Threaded
Open this post in threaded view
|

Re: Kafka 0.10.x event time with windowing

Jia Teoh
Hi Gordon,

Thank you for the clarification. In that case, it sounds like I should be using a custom extractor to emit watermarks in addition to the Kafka timestamps. I see that the doc already has code demonstrating how to emit the timestamps so I will start there.

Thanks,
Jia

On Tue, May 16, 2017 at 10:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Ah, my apologies, some misunderstanding on my side here.

FlinkKafkaConsumer010 attaches the Kafka timestamp with the records, hence a timestamp extractor is not required, BUT you’ll still need a watermark generator to produce the watermarks. That should explain why the windows aren’t firing.

I agree its a bit confusing for the Kafka010 consumer at the moment because timestamp extraction and watermark generating is bound to a single interface. I think there are plans to separate that in the future.

Cheers,
Gordon


On 17 May 2017 at 1:34:31 PM, Jia Teoh ([hidden email]) wrote:

Hi Gordon,

Thanks for confirming my understanding that the extractor should not have to be defined for 0.10. However, I'm still experiencing the case where not using an extractor results in zero window triggers.

I've verified the timestamps in the Kafka records with the following command:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input --from-beginning --property print.timestamp=true
Where the 'input' topic happens to consist of Strings representing the time the record was created. I get output such as the following:
CreateTime:1494998828813        1494998828813
CreateTime:1494998828901        1494998828901
CreateTime:1494998828914        1494998828914
CreateTime:1494998828915        1494998828914
CreateTime:1494998828915        1494998828915
CreateTime:1494998829004        1494998829003
CreateTime:1494998829016        1494998829016
CreateTime:1494998829016        1494998829016
where CreateTime is the timestamp generated by Kafka and the second value is the record value. In this particular case that happens to be the time the record was created in the producer, which resides on the same machine as the Kafka broker (hence the identical values).

-Jia

On Tue, May 16, 2017 at 9:30 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Jia!

This sounds a bit fishy. The docs mention that there is no need for a timestamp / watermark extractor because with 0.10, the timestamps that come with Kafka records can be used directly to produce watermarks for event time.

One quick clarification: did you also check whether the timestamps that come with the Kafka 0.10 records are sound and reasonable?

Cheers,
Gordon


On 17 May 2017 at 4:45:19 AM, Jia Teoh ([hidden email]) wrote:

Hi,

I'm trying to use KafkaConsumer010 as a source for a windowing job on event time, as provided by Kafka. According to the kafka connector doc (link), I've set the time characteristic to event time (streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)) and am using KafkaConsumer010 along with Kafka 0.10.2. I've also set up windowing: "stream.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowMillis)))" (using timeWindowAll appears to be equivalent as well).

Using these configurations I can verify that data is read from Kafka. However, the event time windows never trigger even when data is loaded for much longer than the window size. Is there an additional configuration I am missing? 

I have verified that the Kafka messages have timestamps. The docs mention that there is no need for a timestamp extractor, but using one to explicitly assign the current time does result in windows being triggered.

Thanks,
Jia Teoh