About KafkaConsumer and WM'ing and EventTime charactersitics

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

About KafkaConsumer and WM'ing and EventTime charactersitics

Vishal Santoshi
 In case where one needs t to use kafka event time ( ingestion time )  for watermark generation and timestamp extraction is setting  EventTimeCharactersitic  as EventTime enough ? 

Or is this  explicit code required ? 
consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<KafkaRecord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}

@Override
public long extractTimestamp(KafkaRecord element, long previousElementTimestamp) {
return previousElementTimestamp;
}
});




Reply | Threaded
Open this post in threaded view
|

Re: About KafkaConsumer and WM'ing and EventTime charactersitics

Vishal Santoshi
It seems from https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html that iTimeCharacteristic.IngestionTime should do the trick.  

Just wanted to confirm that the ingestion time is the event time provided by the kafka producer. 

On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <[hidden email]> wrote:
 In case where one needs t to use kafka event time ( ingestion time )  for watermark generation and timestamp extraction is setting  EventTimeCharactersitic  as EventTime enough ? 

Or is this  explicit code required ? 
consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<KafkaRecord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}

@Override
public long extractTimestamp(KafkaRecord element, long previousElementTimestamp) {
return previousElementTimestamp;
}
});




Reply | Threaded
Open this post in threaded view
|

Re: About KafkaConsumer and WM'ing and EventTime charactersitics

Congxian Qiu

Vishal Santoshi <[hidden email]> 于2019年1月30日周三 上午4:36写道:
It seems from https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html that iTimeCharacteristic.IngestionTime should do the trick.  

Just wanted to confirm that the ingestion time is the event time provided by the kafka producer. 

On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <[hidden email]> wrote:
 In case where one needs t to use kafka event time ( ingestion time )  for watermark generation and timestamp extraction is setting  EventTimeCharactersitic  as EventTime enough ? 

Or is this  explicit code required ? 
consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<KafkaRecord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}

@Override
public long extractTimestamp(KafkaRecord element, long previousElementTimestamp) {
return previousElementTimestamp;
}
});




Reply | Threaded
Open this post in threaded view
|

Re: About KafkaConsumer and WM'ing and EventTime charactersitics

Vishal Santoshi
Thank you. This though is a little different.  

The producer of the kafka message attaches a time stamp  https://issues.apache.org/jira/browse/KAFKA-2511.  I do not see how I can get to that timestamp through a any stream abstraction over  FlinkKafkaConsumer  API even though it is available here https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html being used here https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141

All I want to do is this

* Pull from kafka topic . This topic is been written too with a time stamp on each kafka record.
* Write to hdfs using StreamingSink BUT make buckets that  honor ingestion time's  water mark. 

Questions is,

If  we have TimeCharacteristic as IngestionTime,  does the context's watermark  in   getBucketId(KafkaRecord element, Context context) in BucketAssigner.html   reflect the kafka record time stamp in  https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html  given this "automatic timestamp assignment and automatic watermark generation." is done if  TimeCharacteristic is IngestionTime  ( here )


Regards.












On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu <[hidden email]> wrote:

Vishal Santoshi <[hidden email]> 于2019年1月30日周三 上午4:36写道:
It seems from https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html that iTimeCharacteristic.IngestionTime should do the trick.  

Just wanted to confirm that the ingestion time is the event time provided by the kafka producer. 

On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <[hidden email]> wrote:
 In case where one needs t to use kafka event time ( ingestion time )  for watermark generation and timestamp extraction is setting  EventTimeCharactersitic  as EventTime enough ? 

Or is this  explicit code required ? 
consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<KafkaRecord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}

@Override
public long extractTimestamp(KafkaRecord element, long previousElementTimestamp) {
return previousElementTimestamp;
}
});




Reply | Threaded
Open this post in threaded view
|

Re: About KafkaConsumer and WM'ing and EventTime charactersitics

Jamie Grier-2
Vishal, that answer to your question about IngestionTime is "no".  Ingestion time in this context means the time the data was read by Flink not the time it was written to Kafka.

To get the effect you're looking for you have to set TimeCharacteristic.EventTime and follow the instructions here:  https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

You still need the code you provided in your original email above and you also have to do:

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);



On Wed, Jan 30, 2019 at 2:45 AM Vishal Santoshi <[hidden email]> wrote:
Thank you. This though is a little different.  

The producer of the kafka message attaches a time stamp  https://issues.apache.org/jira/browse/KAFKA-2511.  I do not see how I can get to that timestamp through a any stream abstraction over  FlinkKafkaConsumer  API even though it is available here https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html being used here https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141

All I want to do is this

* Pull from kafka topic . This topic is been written too with a time stamp on each kafka record.
* Write to hdfs using StreamingSink BUT make buckets that  honor ingestion time's  water mark. 

Questions is,

If  we have TimeCharacteristic as IngestionTime,  does the context's watermark  in   getBucketId(KafkaRecord element, Context context) in BucketAssigner.html   reflect the kafka record time stamp in  https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html  given this "automatic timestamp assignment and automatic watermark generation." is done if  TimeCharacteristic is IngestionTime  ( here )


Regards.












On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu <[hidden email]> wrote:

Vishal Santoshi <[hidden email]> 于2019年1月30日周三 上午4:36写道:
It seems from https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html that iTimeCharacteristic.IngestionTime should do the trick.  

Just wanted to confirm that the ingestion time is the event time provided by the kafka producer. 

On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <[hidden email]> wrote:
 In case where one needs t to use kafka event time ( ingestion time )  for watermark generation and timestamp extraction is setting  EventTimeCharactersitic  as EventTime enough ? 

Or is this  explicit code required ? 
consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<KafkaRecord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}

@Override
public long extractTimestamp(KafkaRecord element, long previousElementTimestamp) {
return previousElementTimestamp;
}
});