How to make two SQLs use the same KafkaTableSource?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to make two SQLs use the same KafkaTableSource?

Tony Wei
Hi,

I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)` to register my kafka table.
However, I found that because SQL is a lazy operation, it will convert to DataStream under some 
criteria. For example, `Table#toRetractStream`.

So, when I used two SQLs in one application job, the same kafka table will be constructed twice. It
is not a problem from flink side, because two operators held their own state for offsets. But from
kafka side, they will have the same group_id.

I want to make sure that only one kafka source will commit group_id's offsets back to kafka. A
workaround might be registering the same kafka topic twice with different name, group_id for
two SQLs. But I would still like to know if there is any way to make two SQLs just read from the
same KafkaTableSource? Thanks in advance.

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: How to make two SQLs use the same KafkaTableSource?

Tony Wei
forgot to send to user mailing list.

Tony Wei <[hidden email]> 於 2019年8月9日 週五 下午12:36寫道:
Hi Zhenghua,

I didn't get your point. It seems that `isEagerOperationTranslation` is always return false. Is that
means even I used Blink planner, the sql translation is still in a lazy manner?

Or do you mean Blink planner will recognize and link two SQLs to the same kafka source, if
they both use the same kafka table, even if the translation is lazy?

I'm not familiar with the details of translation process, but I guessed the translating eagerly is not
be an only solution. If the translation of the second SQL can reuse the operators from the first SQL,
then it is possible to link them to the same kafka source operator.

Best,
Tony Wei

Zhenghua Gao <[hidden email]> 於 2019年8月9日 週五 上午11:57寫道:
This needs EagerOperationTranslation[1] support. you can try in Blink planner in 1.9.0.

On Fri, Aug 9, 2019 at 10:37 AM Tony Wei <[hidden email]> wrote:
Hi,

I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)` to register my kafka table.
However, I found that because SQL is a lazy operation, it will convert to DataStream under some 
criteria. For example, `Table#toRetractStream`.

So, when I used two SQLs in one application job, the same kafka table will be constructed twice. It
is not a problem from flink side, because two operators held their own state for offsets. But from
kafka side, they will have the same group_id.

I want to make sure that only one kafka source will commit group_id's offsets back to kafka. A
workaround might be registering the same kafka topic twice with different name, group_id for
two SQLs. But I would still like to know if there is any way to make two SQLs just read from the
same KafkaTableSource? Thanks in advance.

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: How to make two SQLs use the same KafkaTableSource?

Zhenghua Gao
Blink planner support lazy translation for multiple SQLs, and the common nodes will be reused in a single job.
The only thing you need note here is the unified TableEnvironmentImpl do not support conversions between Table(s) and Stream(s).
U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).

Best Regards,
Zhenghua Gao


On Fri, Aug 9, 2019 at 12:38 PM Tony Wei <[hidden email]> wrote:
forgot to send to user mailing list.

Tony Wei <[hidden email]> 於 2019年8月9日 週五 下午12:36寫道:
Hi Zhenghua,

I didn't get your point. It seems that `isEagerOperationTranslation` is always return false. Is that
means even I used Blink planner, the sql translation is still in a lazy manner?

Or do you mean Blink planner will recognize and link two SQLs to the same kafka source, if
they both use the same kafka table, even if the translation is lazy?

I'm not familiar with the details of translation process, but I guessed the translating eagerly is not
be an only solution. If the translation of the second SQL can reuse the operators from the first SQL,
then it is possible to link them to the same kafka source operator.

Best,
Tony Wei

Zhenghua Gao <[hidden email]> 於 2019年8月9日 週五 上午11:57寫道:
This needs EagerOperationTranslation[1] support. you can try in Blink planner in 1.9.0.

On Fri, Aug 9, 2019 at 10:37 AM Tony Wei <[hidden email]> wrote:
Hi,

I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)` to register my kafka table.
However, I found that because SQL is a lazy operation, it will convert to DataStream under some 
criteria. For example, `Table#toRetractStream`.

So, when I used two SQLs in one application job, the same kafka table will be constructed twice. It
is not a problem from flink side, because two operators held their own state for offsets. But from
kafka side, they will have the same group_id.

I want to make sure that only one kafka source will commit group_id's offsets back to kafka. A
workaround might be registering the same kafka topic twice with different name, group_id for
two SQLs. But I would still like to know if there is any way to make two SQLs just read from the
same KafkaTableSource? Thanks in advance.

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: How to make two SQLs use the same KafkaTableSource?

Tony Wei
Hi Zhenghua,

Blink planner support lazy translation for multiple SQLs, and the common nodes will be reused in a single job.

It is very helpful, and thanks for your clarification.
 
The only thing you need note here is the unified TableEnvironmentImpl do not support conversions between Table(s) and Stream(s).
U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).

Does this mean that only common nodes that generated from pure SQL api can be reused. Operator nodes
created from DataStream api will not be recognized by Blink planner? If this is the case, it is fine with me. My
original question just focused on reused nodes in SQL api, and seems Blink planner is what I need. Thanks
for your help again.

Best,
Tony Wei

Zhenghua Gao <[hidden email]> 於 2019年8月9日 週五 下午1:54寫道:
Blink planner support lazy translation for multiple SQLs, and the common nodes will be reused in a single job.
The only thing you need note here is the unified TableEnvironmentImpl do not support conversions between Table(s) and Stream(s).
U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).

Best Regards,
Zhenghua Gao


On Fri, Aug 9, 2019 at 12:38 PM Tony Wei <[hidden email]> wrote:
forgot to send to user mailing list.

Tony Wei <[hidden email]> 於 2019年8月9日 週五 下午12:36寫道:
Hi Zhenghua,

I didn't get your point. It seems that `isEagerOperationTranslation` is always return false. Is that
means even I used Blink planner, the sql translation is still in a lazy manner?

Or do you mean Blink planner will recognize and link two SQLs to the same kafka source, if
they both use the same kafka table, even if the translation is lazy?

I'm not familiar with the details of translation process, but I guessed the translating eagerly is not
be an only solution. If the translation of the second SQL can reuse the operators from the first SQL,
then it is possible to link them to the same kafka source operator.

Best,
Tony Wei

Zhenghua Gao <[hidden email]> 於 2019年8月9日 週五 上午11:57寫道:
This needs EagerOperationTranslation[1] support. you can try in Blink planner in 1.9.0.

On Fri, Aug 9, 2019 at 10:37 AM Tony Wei <[hidden email]> wrote:
Hi,

I used `flinkTableEnv.connect(new Kafka()...).registerTableSource(...)` to register my kafka table.
However, I found that because SQL is a lazy operation, it will convert to DataStream under some 
criteria. For example, `Table#toRetractStream`.

So, when I used two SQLs in one application job, the same kafka table will be constructed twice. It
is not a problem from flink side, because two operators held their own state for offsets. But from
kafka side, they will have the same group_id.

I want to make sure that only one kafka source will commit group_id's offsets back to kafka. A
workaround might be registering the same kafka topic twice with different name, group_id for
two SQLs. But I would still like to know if there is any way to make two SQLs just read from the
same KafkaTableSource? Thanks in advance.

Best,
Tony Wei