In case where one needs t to use kafka event time ( ingestion time ) for watermark generation and timestamp extraction is setting EventTimeCharactersitic as EventTime enough ?
Or is this explicit code required ? consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<KafkaRecord>() { |
It seems from https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html that iTimeCharacteristic.IngestionTime should do the trick. Just wanted to confirm that the ingestion time is the event time provided by the kafka producer. On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <[hidden email]> wrote:
|
Hi Vishal May this doc[1] be helpful for you. Best, Congxian Vishal Santoshi <[hidden email]> 于2019年1月30日周三 上午4:36写道:
|
Thank you. This though is a little different. The producer of the kafka message attaches a time stamp https://issues.apache.org/jira/browse/KAFKA-2511. I do not see how I can get to that timestamp through a any stream abstraction over FlinkKafkaConsumer API even though it is available here https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html being used here https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141 All I want to do is this * Pull from kafka topic . This topic is been written too with a time stamp on each kafka record. * Write to hdfs using StreamingSink BUT make buckets that honor ingestion time's water mark. Questions is, If we have TimeCharacteristic as IngestionTime, does the context's watermark in getBucketId(KafkaRecord element, Context context) in BucketAssigner.html reflect the kafka record time stamp in https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html given this "automatic timestamp assignment and automatic watermark generation." is done if TimeCharacteristic is IngestionTime ( here ) Regards. On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu <[hidden email]> wrote:
|
Vishal, that answer to your question about IngestionTime is "no". Ingestion time in this context means the time the data was read by Flink not the time it was written to Kafka. To get the effect you're looking for you have to set TimeCharacteristic.EventTime and follow the instructions here: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 You still need the code you provided in your original email above and you also have to do:
On Wed, Jan 30, 2019 at 2:45 AM Vishal Santoshi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |