Record timestamp from kafka

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Record timestamp from kafka

Navneeth Krishnan
Hi,

Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Record timestamp from kafka

Ben Yan
hi,
Is that what you mean?
See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 

Best
Ben

On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Record timestamp from kafka

Fabian Hueske-2
Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.

Best, Fabian

2018-03-30 7:45 GMT+02:00 Ben Yan <[hidden email]>:
hi,
Is that what you mean?
See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 

Best
Ben

On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Record timestamp from kafka

Ben Yan
Hi Fabian.

If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
Thanks!

Best
Ben

On Apr 4, 2018, at 7:00 PM, Fabian Hueske <[hidden email]> wrote:

Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.

Best, Fabian

2018-03-30 7:45 GMT+02:00 Ben Yan <[hidden email]>:
hi,
Is that what you mean?
See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 

Best
Ben

On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.



Reply | Threaded
Open this post in threaded view
|

Re: Record timestamp from kafka

Chesnay Schepler
You must use a ProcessFunction for this, the timestamps are not exposed in any way to map/flatmap functions.

On 10.04.2018 12:29, Ben Yan wrote:
Hi Fabian.

If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
Thanks!

Best
Ben

On Apr 4, 2018, at 7:00 PM, Fabian Hueske <[hidden email]> wrote:

Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.

Best, Fabian

2018-03-30 7:45 GMT+02:00 Ben Yan <[hidden email]>:
hi,
Is that what you mean?
See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 

Best
Ben

On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.




Reply | Threaded
Open this post in threaded view
|

Re: Record timestamp from kafka

Ben Yan
In reply to this post by Ben Yan
Hi Fabian:

        I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of files. After analysis, it is found that it is due to the eventually consistent  of S3.Are there any better solutions available?thanks

Best
Ben

<a href="https://issues.apache.org/jira/browse/FLINK-8794?jql=text ~ &quot;BucketingSink&quot;" class="">https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22 

On Apr 10, 2018, at 6:29 PM, Ben Yan <[hidden email]> wrote:

Hi Fabian.

If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
Thanks!

Best
Ben

On Apr 4, 2018, at 7:00 PM, Fabian Hueske <[hidden email]> wrote:

Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.

Best, Fabian

2018-03-30 7:45 GMT+02:00 Ben Yan <[hidden email]>:
hi,
Is that what you mean?
See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 

Best
Ben

On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.




Reply | Threaded
Open this post in threaded view
|

Re: Record timestamp from kafka

Ben Yan


On Apr 10, 2018, at 7:32 PM, Ben Yan <[hidden email]> wrote:

Hi Chesnay:

        I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of files. After analysis, it is found that it is due to the eventually consistent  of S3.Are there any better solutions available?thanks
See : <a href="https://issues.apache.org/jira/browse/FLINK-8794?jql=text ~ &quot;BucketingSink&quot;" class="">https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22  
Best
Ben

On Apr 10, 2018, at 6:29 PM, Ben Yan <[hidden email]> wrote:

Hi Fabian.

If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
Thanks!

Best
Ben

On Apr 4, 2018, at 7:00 PM, Fabian Hueske <[hidden email]> wrote:

Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1].
Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.

Best, Fabian

2018-03-30 7:45 GMT+02:00 Ben Yan <[hidden email]>:
hi,
Is that what you mean?
See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 

Best
Ben

On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.