[SQL DDL] How to extract timestamps from Kafka message's metadata

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[SQL DDL] How to extract timestamps from Kafka message's metadata

Dongwon Kim-2
Hi,

I'm working on a Kafka topic where timestamps are not shown in the message body, instead in the message's metadata.

I want to declare a table from the topic with DDL but "rowtime_column_name" in the below definition seems to accept only existing columns.
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression.

Can I define watermarks in this situation where timestamps are shown only in the metadata?

Thanks,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

Dawid Wysakowicz-2

I'm afraid it is not supported yet. The discussion[1] to support it started in the past, but unfortunately it has not concluded yet.

One approach I can think of, how you can work this limitation around is to provide your own Format[2]. Unfortunately it is not the most straightforward solution.

Best,

Dawid

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats

On 11/08/2020 09:20, Dongwon Kim wrote:
Hi,

I'm working on a Kafka topic where timestamps are not shown in the message body, instead in the message's metadata.

I want to declare a table from the topic with DDL but "rowtime_column_name" in the below definition seems to accept only existing columns.
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression.

Can I define watermarks in this situation where timestamps are shown only in the metadata?

Thanks,

Dongwon

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

Dongwon Kim-2
Hi Dawid,

I'll try your suggestion [2] and wait for [1] to be supported in next versions.

Thanks,

p.s. It's not easy to insert the timestamp into the body because it will affect other applications. In this regard, I hope [1] is going to be available soon.

Dongwon


On Tue, Aug 11, 2020 at 4:31 PM Dawid Wysakowicz <[hidden email]> wrote:

I'm afraid it is not supported yet. The discussion[1] to support it started in the past, but unfortunately it has not concluded yet.

One approach I can think of, how you can work this limitation around is to provide your own Format[2]. Unfortunately it is not the most straightforward solution.

Best,

Dawid

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats

On 11/08/2020 09:20, Dongwon Kim wrote:
Hi,

I'm working on a Kafka topic where timestamps are not shown in the message body, instead in the message's metadata.

I want to declare a table from the topic with DDL but "rowtime_column_name" in the below definition seems to accept only existing columns.
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression.

Can I define watermarks in this situation where timestamps are shown only in the metadata?

Thanks,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

Fabian Hueske-2
Hi Dongwon,

Maybe you can add your use case to the FLIP-107 discussion thread [1] and thereby support the proposal (after checking that it would solve your problem).

It's always helpful to learn about the requirements of users when designing new features.
It also helps to prioritize which features to push for.

Thank you,
Fabian


Am Di., 11. Aug. 2020 um 09:42 Uhr schrieb Dongwon Kim <[hidden email]>:
Hi Dawid,

I'll try your suggestion [2] and wait for [1] to be supported in next versions.

Thanks,

p.s. It's not easy to insert the timestamp into the body because it will affect other applications. In this regard, I hope [1] is going to be available soon.

Dongwon


On Tue, Aug 11, 2020 at 4:31 PM Dawid Wysakowicz <[hidden email]> wrote:

I'm afraid it is not supported yet. The discussion[1] to support it started in the past, but unfortunately it has not concluded yet.

One approach I can think of, how you can work this limitation around is to provide your own Format[2]. Unfortunately it is not the most straightforward solution.

Best,

Dawid

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats

On 11/08/2020 09:20, Dongwon Kim wrote:
Hi,

I'm working on a Kafka topic where timestamps are not shown in the message body, instead in the message's metadata.

I want to declare a table from the topic with DDL but "rowtime_column_name" in the below definition seems to accept only existing columns.
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression.

Can I define watermarks in this situation where timestamps are shown only in the metadata?

Thanks,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

Timo Walther
In reply to this post by Dongwon Kim-2
Hi Dongwon,

another possibility is to use DataStream API before. There you can
extract the metadata and use DataStream.assignTimestampsAndWatermarks
before converting the stream to a table.

Regards,
Timo


On 11.08.20 09:41, Dongwon Kim wrote:

> Hi Dawid,
>
> I'll try your suggestion [2] and wait for [1] to be supported in next
> versions.
>
> Thanks,
>
> p.s. It's not easy to insert the timestamp into the body because it will
> affect other applications. In this regard, I hope [1] is going to be
> available soon.
>
> Dongwon
>
>
> On Tue, Aug 11, 2020 at 4:31 PM Dawid Wysakowicz <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I'm afraid it is not supported yet. The discussion[1] to support it
>     started in the past, but unfortunately it has not concluded yet.
>
>     One approach I can think of, how you can work this limitation around
>     is to provide your own Format[2]. Unfortunately it is not the most
>     straightforward solution.
>
>     Best,
>
>     Dawid
>
>     [1]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>
>     [2]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats
>
>     On 11/08/2020 09:20, Dongwon Kim wrote:
>>     Hi,
>>
>>     I'm working on a Kafka topic where timestamps are not shown in the
>>     message body, instead in the message's metadata.
>>
>>     I want to declare a table from the topic with DDL but
>>     "rowtime_column_name" in the below definition seems to accept only
>>     existing columns.
>>
>>         WATERMARK FOR rowtime_column_name AS
>>         watermark_strategy_expression.
>>
>>
>>     Can I define watermarks in this situation where timestamps are
>>     shown only in the metadata?
>>
>>     Thanks,
>>
>>     Dongwon
>

Reply | Threaded
Open this post in threaded view
|

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

Dongwon Kim-2
Hi Timo,

Thanks for your input.
We've been considering that as well, but this time I just wanted to solely use TableEnvironment without DataStream APIs.

but that would be the most straightforward solution this time around.

Thanks and regards,

Dongwon


On Tue, Aug 11, 2020 at 4:50 PM Timo Walther <[hidden email]> wrote:
Hi Dongwon,

another possibility is to use DataStream API before. There you can
extract the metadata and use DataStream.assignTimestampsAndWatermarks
before converting the stream to a table.

Regards,
Timo


On 11.08.20 09:41, Dongwon Kim wrote:
> Hi Dawid,
>
> I'll try your suggestion [2] and wait for [1] to be supported in next
> versions.
>
> Thanks,
>
> p.s. It's not easy to insert the timestamp into the body because it will
> affect other applications. In this regard, I hope [1] is going to be
> available soon.
>
> Dongwon
>
>
> On Tue, Aug 11, 2020 at 4:31 PM Dawid Wysakowicz <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I'm afraid it is not supported yet. The discussion[1] to support it
>     started in the past, but unfortunately it has not concluded yet.
>
>     One approach I can think of, how you can work this limitation around
>     is to provide your own Format[2]. Unfortunately it is not the most
>     straightforward solution.
>
>     Best,
>
>     Dawid
>
>     [1]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>
>     [2]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats
>
>     On 11/08/2020 09:20, Dongwon Kim wrote:
>>     Hi,
>>
>>     I'm working on a Kafka topic where timestamps are not shown in the
>>     message body, instead in the message's metadata.
>>
>>     I want to declare a table from the topic with DDL but
>>     "rowtime_column_name" in the below definition seems to accept only
>>     existing columns.
>>
>>         WATERMARK FOR rowtime_column_name AS
>>         watermark_strategy_expression.
>>
>>
>>     Can I define watermarks in this situation where timestamps are
>>     shown only in the metadata?
>>
>>     Thanks,
>>
>>     Dongwon
>

Reply | Threaded
Open this post in threaded view
|

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

Jark Wu-3

On Tue, 11 Aug 2020 at 16:08, Dongwon Kim <[hidden email]> wrote:
Hi Timo,

Thanks for your input.
We've been considering that as well, but this time I just wanted to solely use TableEnvironment without DataStream APIs.

but that would be the most straightforward solution this time around.

Thanks and regards,

Dongwon


On Tue, Aug 11, 2020 at 4:50 PM Timo Walther <[hidden email]> wrote:
Hi Dongwon,

another possibility is to use DataStream API before. There you can
extract the metadata and use DataStream.assignTimestampsAndWatermarks
before converting the stream to a table.

Regards,
Timo


On 11.08.20 09:41, Dongwon Kim wrote:
> Hi Dawid,
>
> I'll try your suggestion [2] and wait for [1] to be supported in next
> versions.
>
> Thanks,
>
> p.s. It's not easy to insert the timestamp into the body because it will
> affect other applications. In this regard, I hope [1] is going to be
> available soon.
>
> Dongwon
>
>
> On Tue, Aug 11, 2020 at 4:31 PM Dawid Wysakowicz <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I'm afraid it is not supported yet. The discussion[1] to support it
>     started in the past, but unfortunately it has not concluded yet.
>
>     One approach I can think of, how you can work this limitation around
>     is to provide your own Format[2]. Unfortunately it is not the most
>     straightforward solution.
>
>     Best,
>
>     Dawid
>
>     [1]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>
>     [2]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats
>
>     On 11/08/2020 09:20, Dongwon Kim wrote:
>>     Hi,
>>
>>     I'm working on a Kafka topic where timestamps are not shown in the
>>     message body, instead in the message's metadata.
>>
>>     I want to declare a table from the topic with DDL but
>>     "rowtime_column_name" in the below definition seems to accept only
>>     existing columns.
>>
>>         WATERMARK FOR rowtime_column_name AS
>>         watermark_strategy_expression.
>>
>>
>>     Can I define watermarks in this situation where timestamps are
>>     shown only in the metadata?
>>
>>     Thanks,
>>
>>     Dongwon
>