How to convert retract stream to dynamic table?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to convert retract stream to dynamic table?

James Baker
Hi!
I've been looking at Flink for the last few days and have very much appreciated the concept of Dynamic Tables, it solves a lot of my needs and handles a lot of the complex state tracking that is otherwise painful. I have a question about the composability of the system which the docs don't answer.

The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks GROUP BY user', where clicks is a stream coming in of user and the url they've clicked.

From such a Table, I can then get a retract stream written into an external system, perhaps outputting (true, User1, 1), ..., (true, User1, 2) indicating that User1's clicked on something.

Is there an idiomatic way to convert a retract stream into a semantically equivalent table?

Thanks,
James
Reply | Threaded
Open this post in threaded view
|

Re: How to convert retract stream to dynamic table?

Kurt Young
Hi James,

If I understand correctly, you can use `TableEnvironment#sqlQuery` to achieve
what you want. You can pass the whole sql statement in and get a `Table` back
from the method. I believe this is the table you want which is semantically 
equivalent with the stream you mentioned. 

For example, you can further operate on the `Table` with other sql operations,
like `GROUP BY cnt` on the returned table. You can think of it in this way that 
Flink would attach another aggregation operator to the original plan, and this 
operator can consume the retraction stream which the original sql statement 
produced and start to generate correct results.

Best,
Kurt


On Thu, Dec 19, 2019 at 1:25 AM James Baker <[hidden email]> wrote:
Hi!
I've been looking at Flink for the last few days and have very much appreciated the concept of Dynamic Tables, it solves a lot of my needs and handles a lot of the complex state tracking that is otherwise painful. I have a question about the composability of the system which the docs don't answer.

The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks GROUP BY user', where clicks is a stream coming in of user and the url they've clicked.

From such a Table, I can then get a retract stream written into an external system, perhaps outputting (true, User1, 1), ..., (true, User1, 2) indicating that User1's clicked on something.

Is there an idiomatic way to convert a retract stream into a semantically equivalent table?

Thanks,
James
Reply | Threaded
Open this post in threaded view
|

Re: How to convert retract stream to dynamic table?

Dawid Wysakowicz-2

Hi,

Correct me if I am wrong James, but I think your original question was how do you create a Table out of a changelog (a stream with a change flag).  Unfortunately I think it is not possible right now. This definitely is high on our priority list for the near future. There were first approaches[1] to implement that before, but we must clarify all aspects of such operation first.

Best,

Dawid

[1] https://github.com/apache/flink/pull/6787

On 19/12/2019 04:05, Kurt Young wrote:
Hi James,

If I understand correctly, you can use `TableEnvironment#sqlQuery` to achieve
what you want. You can pass the whole sql statement in and get a `Table` back
from the method. I believe this is the table you want which is semantically 
equivalent with the stream you mentioned. 

For example, you can further operate on the `Table` with other sql operations,
like `GROUP BY cnt` on the returned table. You can think of it in this way that 
Flink would attach another aggregation operator to the original plan, and this 
operator can consume the retraction stream which the original sql statement 
produced and start to generate correct results.

Best,
Kurt


On Thu, Dec 19, 2019 at 1:25 AM James Baker <[hidden email]> wrote:
Hi!
I've been looking at Flink for the last few days and have very much appreciated the concept of Dynamic Tables, it solves a lot of my needs and handles a lot of the complex state tracking that is otherwise painful. I have a question about the composability of the system which the docs don't answer.

The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks GROUP BY user', where clicks is a stream coming in of user and the url they've clicked.

From such a Table, I can then get a retract stream written into an external system, perhaps outputting (true, User1, 1), ..., (true, User1, 2) indicating that User1's clicked on something.

Is there an idiomatic way to convert a retract stream into a semantically equivalent table?

Thanks,
James

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to convert retract stream to dynamic table?

David Anderson-2
The Elasticsearch, HBase, and JDBC[1] table sinks all support streaming UPSERT mode[2]. While not exactly the same as RETRACT mode, it seems like this might get the job done (unless I'm missing something, which is entirely possible).

David




On Thu, Dec 19, 2019 at 9:20 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi,

Correct me if I am wrong James, but I think your original question was how do you create a Table out of a changelog (a stream with a change flag).  Unfortunately I think it is not possible right now. This definitely is high on our priority list for the near future. There were first approaches[1] to implement that before, but we must clarify all aspects of such operation first.

Best,

Dawid

[1] https://github.com/apache/flink/pull/6787

On 19/12/2019 04:05, Kurt Young wrote:
Hi James,

If I understand correctly, you can use `TableEnvironment#sqlQuery` to achieve
what you want. You can pass the whole sql statement in and get a `Table` back
from the method. I believe this is the table you want which is semantically 
equivalent with the stream you mentioned. 

For example, you can further operate on the `Table` with other sql operations,
like `GROUP BY cnt` on the returned table. You can think of it in this way that 
Flink would attach another aggregation operator to the original plan, and this 
operator can consume the retraction stream which the original sql statement 
produced and start to generate correct results.

Best,
Kurt


On Thu, Dec 19, 2019 at 1:25 AM James Baker <[hidden email]> wrote:
Hi!
I've been looking at Flink for the last few days and have very much appreciated the concept of Dynamic Tables, it solves a lot of my needs and handles a lot of the complex state tracking that is otherwise painful. I have a question about the composability of the system which the docs don't answer.

The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks GROUP BY user', where clicks is a stream coming in of user and the url they've clicked.

From such a Table, I can then get a retract stream written into an external system, perhaps outputting (true, User1, 1), ..., (true, User1, 2) indicating that User1's clicked on something.

Is there an idiomatic way to convert a retract stream into a semantically equivalent table?

Thanks,
James