Best practice to handle update messages in stream

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Best practice to handle update messages in stream

徐涛
Hi Experts,
        Assuming there is a stream which content is like this:
        Seq     ID             MONEY
        1.        100           100
        2.        100           200
        3.        101           300

        The record of Seq#2 is updating record of Seq#1, changing the money from 100 to 200.
        If I register the stream as table T, and want to sum all the money group by each ID, if I write  "select sum(MONEY) from T”, will get 600 as the result, which is incorrect.

        I can write a UDAF, for example latest, to compute the latest value of all the ID, then the SQL is like this:
        select sum(MONEY) from
        (
                select ID, latest(MONEY) from T group by ID
        )
        But I have to save each ID and its latest value in state, I am worried that the state goes too large. Now I use this method and set the state retention to several days before the state goes too large. I wonder if there are better ways to do this.

        So what is the best practice in this scenario? Anyone have a suggestion? Thanks a lot.


Best
Henry
       
Reply | Threaded
Open this post in threaded view
|

Re: Best practice to handle update messages in stream

Piotr Nowojski-3
Hi,

There is an ongoing work [1] to support natively the streams like you described (we call them upsert streams/changelogs). But it boils down to the exactly the same thing you have done - aggregating the records per key and adding `latest` aggregation function. Until we support this natively, you can use the query that you have written.

Regarding the state size. In most cases there is no workaround this issue. Records overwriting previous value could arrive at arbitrary point of time and for most of the operations (like SUM aggregation in your case, filtering) we need to keep the previous value for the key on the state. Sometimes it might be possible to optimise the query and skip the “latest value aggregation”, if the following SQL operator either do not need to know the previous value (like sink or projection) or if the following SQL operator knows the previous value anyway (like join).

Piotr Nowojski

[1] https://issues.apache.org/jira/browse/FLINK-8545

On 21 Mar 2019, at 09:39, 徐涛 <[hidden email]> wrote:

Hi Experts,
Assuming there is a stream which content is like this:
       Seq     ID             MONEY
1.        100           100
       2.        100           200
       3.        101           300

The record of Seq#2 is updating record of Seq#1, changing the money from 100 to 200.
If I register the stream as table T, and want to sum all the money group by each ID, if I write  "select sum(MONEY) from T”, will get 600 as the result, which is incorrect.

I can write a UDAF, for example latest, to compute the latest value of all the ID, then the SQL is like this:
select sum(MONEY) from
(
select ID, latest(MONEY) from T group by ID
)
But I have to save each ID and its latest value in state, I am worried that the state goes too large. Now I use this method and set the state retention to several days before the state goes too large. I wonder if there are better ways to do this.

So what is the best practice in this scenario? Anyone have a suggestion? Thanks a lot.


Best
Henry