Idle source configuration per topic with the Kafka Table API connector

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Idle source configuration per topic with the Kafka Table API connector

Svend
Hi everyone,

My Flink streaming application consumes several Kafka topics, one of which receiving traffic in burst once per day.

I would like that topic not to hold back the progress of the watermark.

Most of my code is currently using the SQL API and in particular the Table API Kafka connector.

I have read about the idle source configuration mechanism, could you please confirm my understanding that:

* as per [1]: when I'm using the Table API Kafka connector, we currently do not have the possibility to specify the idle source parameter specifically for each topic, although we can set it globally on the StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter

* as per [2]: when using the DataStream Kafka connector, we can set the idle source parameter specifically for each topic by specifying ".withIdleness()" to the WatermarkStrategy.

If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table.

Thanks a lot!

Svend







Reply | Threaded
Open this post in threaded view
|

Re: Idle source configuration per topic with the Kafka Table API connector

JING ZHANG
Hi Svend,
Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those version provides many related improvements.

> as per [1]
Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a global parameter, It would apply to all those table sources which with watermark  clause but not use SOURCE WATERMARK
> as per [2]
Yes.
> If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table.
Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like the following demo:
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.XXXX // other logical
.watermark("columnName", "SOURCE_WATERMARK()")
                        .build());
I would like to invite Jark And Timo to double check, they are more familiar with the issue.

Best,
JING ZHANG

Svend <[hidden email]> 于2021年5月29日周六 下午3:34写道:
Hi everyone,

My Flink streaming application consumes several Kafka topics, one of which receiving traffic in burst once per day.

I would like that topic not to hold back the progress of the watermark.

Most of my code is currently using the SQL API and in particular the Table API Kafka connector.

I have read about the idle source configuration mechanism, could you please confirm my understanding that:

* as per [1]: when I'm using the Table API Kafka connector, we currently do not have the possibility to specify the idle source parameter specifically for each topic, although we can set it globally on the StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter

* as per [2]: when using the DataStream Kafka connector, we can set the idle source parameter specifically for each topic by specifying ".withIdleness()" to the WatermarkStrategy.

If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table.

Thanks a lot!

Svend







Reply | Threaded
Open this post in threaded view
|

Re: Idle source configuration per topic with the Kafka Table API connector

Svend
Awesome,  thanks a lot for clarifications Jing Zhang, it's very useful.

Best,

Svend

On Sun, 30 May 2021, at 6:27 AM, JING ZHANG wrote:
Hi Svend,
Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those version provides many related improvements.

> as per [1]
Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a global parameter, It would apply to all those table sources which with watermark  clause but not use SOURCE WATERMARK
> as per [2]
Yes.
> If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table.
Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like the following demo:
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.XXXX // other logical
.watermark("columnName", "SOURCE_WATERMARK()")
                        .build());
I would like to invite Jark And Timo to double check, they are more familiar with the issue.

Best,
JING ZHANG


Svend <[hidden email]> 于2021年5月29日周六 下午3:34写道:

Hi everyone,

My Flink streaming application consumes several Kafka topics, one of which receiving traffic in burst once per day.

I would like that topic not to hold back the progress of the watermark.

Most of my code is currently using the SQL API and in particular the Table API Kafka connector.

I have read about the idle source configuration mechanism, could you please confirm my understanding that:

* as per [1]: when I'm using the Table API Kafka connector, we currently do not have the possibility to specify the idle source parameter specifically for each topic, although we can set it globally on the StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter

* as per [2]: when using the DataStream Kafka connector, we can set the idle source parameter specifically for each topic by specifying ".withIdleness()" to the WatermarkStrategy.

If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table.

Thanks a lot!

Svend