A question about flink sql retreact stream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

A question about flink sql retreact stream

Henry Dai

Dear flink developers&users
    
    I have a question about flink sql, It gives me a lot of trouble, Thank you very much for some help.
   
    Lets's assume we have two data stream, `order` and `order_detail`, they are from mysql binlog.

    Table `order` schema:
    id              int     primary key
    order_id    int
    status        int

    Table `order_detail` schema:
    id               int     primary key
    order_id     int    
    quantity      int

    order : order_detail = 1:N, they are joined by `order_id`

    think we have following data sequence, and we compute sum(quantity) group by order.oreder_id after they are joined

time         order                            order__detail                     result
        id  order_id    status            id  order_id    quantity
1       1   12345       0
2                                                 1   12345       10                    (T 10)
3                                                 2   12345       11                    (F 10)(T 21)
4                                                 3   12345       12                    (F 21)(T 33)
5       1   12345       1                                                                (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)        
   
    Code:
    tableEnv.registerTableSource("a", new Order());
    tableEnv.registerTableSource("b", new OrderDetail());
    Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
    tableEnv.registerTable("ax", tbl1);
    Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
    tableEnv.registerTable("bx", tbl2);
    Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity) FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
    DataStream<Tuple2<Boolean, Row>> stream = tableEnv.toRetractStream(table, Row.class);
    stream.print();

    Result:
    (true,12345,10)
    (false,12345,10)
    (true,12345,21)
    (false,12345,21)
    (true,12345,33)
    (false,12345,33)
    (true,12345,21)
    (false,12345,21)
    (true,12345,10)
    (false,12345,10)
    (true,12345,12)
    (false,12345,12)
    (true,12345,23)
    (false,12345,23)
    (true,12345,33)


    I cann't understand why flink emit so many records at time 5?

    In production, we consume binlog stream from kafka, convert stream to flink table, after sql computation, convert result table to flink stream where we only
    preserve TRUE message in retract stream, and emit them to downstream kafka.

    Do we have some method to realize flink dynamic table really (I mean, trigger computation only once), when we receive (1,12345,1) from `order`, only emit (F 33)(T 33).

--
best wishes
hengyu
Reply | Threaded
Open this post in threaded view
|

Re: A question about flink sql retreact stream

Jark Wu-3
Thanks Henry for the detailed example, 

I will explain why so many records at time 5. 
That is because the retraction mechanism is per-record triggered in Flink SQL, so there is record amplification in your case.  
At time 5, the LAST_VALUE aggregation for stream a will first emit -(1, 12345, 0) and then +(1, 12345, 0).
When the -(1, 12345, 0) arrives at the join operator, it will join the previous 3 records in stream b, and then send 3 retraction messages.
When the 3 retraction messages arrive at the sum aggregation, it produces (F 33)(T 21)(F 21)(T 10)(F 10). 
In contrast, when the +(1, 12345, 0) arrives the join operator, it sends 3 joined accumulation messages to sum aggregation, and produces (T 12)(F 12)(T 23)(F 23)(T 33) .

In Flink SQL, the mini-batch [1] optimization can reduce this amplification, because it is triggered in a min-batch of records. 

Best,
Jark



On Wed, 4 Nov 2020 at 23:01, Henry Dai <[hidden email]> wrote:

Dear flink developers&users
    
    I have a question about flink sql, It gives me a lot of trouble, Thank you very much for some help.
   
    Lets's assume we have two data stream, `order` and `order_detail`, they are from mysql binlog.

    Table `order` schema:
    id              int     primary key
    order_id    int
    status        int

    Table `order_detail` schema:
    id               int     primary key
    order_id     int    
    quantity      int

    order : order_detail = 1:N, they are joined by `order_id`

    think we have following data sequence, and we compute sum(quantity) group by order.oreder_id after they are joined

time         order                            order__detail                     result
        id  order_id    status            id  order_id    quantity
1       1   12345       0
2                                                 1   12345       10                    (T 10)
3                                                 2   12345       11                    (F 10)(T 21)
4                                                 3   12345       12                    (F 21)(T 33)
5       1   12345       1                                                                (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)        
   
    Code:
    tableEnv.registerTableSource("a", new Order());
    tableEnv.registerTableSource("b", new OrderDetail());
    Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
    tableEnv.registerTable("ax", tbl1);
    Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
    tableEnv.registerTable("bx", tbl2);
    Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity) FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
    DataStream<Tuple2<Boolean, Row>> stream = tableEnv.toRetractStream(table, Row.class);
    stream.print();

    Result:
    (true,12345,10)
    (false,12345,10)
    (true,12345,21)
    (false,12345,21)
    (true,12345,33)
    (false,12345,33)
    (true,12345,21)
    (false,12345,21)
    (true,12345,10)
    (false,12345,10)
    (true,12345,12)
    (false,12345,12)
    (true,12345,23)
    (false,12345,23)
    (true,12345,33)


    I cann't understand why flink emit so many records at time 5?

    In production, we consume binlog stream from kafka, convert stream to flink table, after sql computation, convert result table to flink stream where we only
    preserve TRUE message in retract stream, and emit them to downstream kafka.

    Do we have some method to realize flink dynamic table really (I mean, trigger computation only once), when we receive (1,12345,1) from `order`, only emit (F 33)(T 33).

--
best wishes
hengyu
Reply | Threaded
Open this post in threaded view
|

Re: A question about flink sql retreact stream

Jark Wu-3
Yes. 

There is also a Flink Forward session [1] (since 14:00) talked about 
the internals of the underlying changelog mechanism with a visual example. 

Best,
Jark


On Thu, 5 Nov 2020 at 15:48, Henry Dai <[hidden email]> wrote:
Hi Jark, 

Thanks for your reply, it helps me a lot!

I have tested the mini-batch optimization, the result shows it reduces a lot of records produced when a Flink Table is converted to a Retracted DataStream.

It seems I got wrong understanding about Flink's "Dynamic Table" concept in the past: if a record R1 is coming to sql computation, before it's processed, the Result Table's data view is V1, and after R1 is processed, Result Table's data view turn to V2. I used to believe the ChangeLog is simply Diff(V2, V1).

Actually, there are a lot of intermediate changes during processing R1.

Thanks!



Jark Wu <[hidden email]> 于2020年11月5日周四 上午11:36写道:
Thanks Henry for the detailed example, 

I will explain why so many records at time 5. 
That is because the retraction mechanism is per-record triggered in Flink SQL, so there is record amplification in your case.  
At time 5, the LAST_VALUE aggregation for stream a will first emit -(1, 12345, 0) and then +(1, 12345, 0).
When the -(1, 12345, 0) arrives at the join operator, it will join the previous 3 records in stream b, and then send 3 retraction messages.
When the 3 retraction messages arrive at the sum aggregation, it produces (F 33)(T 21)(F 21)(T 10)(F 10). 
In contrast, when the +(1, 12345, 0) arrives the join operator, it sends 3 joined accumulation messages to sum aggregation, and produces (T 12)(F 12)(T 23)(F 23)(T 33) .

In Flink SQL, the mini-batch [1] optimization can reduce this amplification, because it is triggered in a min-batch of records. 

Best,
Jark



On Wed, 4 Nov 2020 at 23:01, Henry Dai <[hidden email]> wrote:

Dear flink developers&users
    
    I have a question about flink sql, It gives me a lot of trouble, Thank you very much for some help.
   
    Lets's assume we have two data stream, `order` and `order_detail`, they are from mysql binlog.

    Table `order` schema:
    id              int     primary key
    order_id    int
    status        int

    Table `order_detail` schema:
    id               int     primary key
    order_id     int    
    quantity      int

    order : order_detail = 1:N, they are joined by `order_id`

    think we have following data sequence, and we compute sum(quantity) group by order.oreder_id after they are joined

time         order                            order__detail                     result
        id  order_id    status            id  order_id    quantity
1       1   12345       0
2                                                 1   12345       10                    (T 10)
3                                                 2   12345       11                    (F 10)(T 21)
4                                                 3   12345       12                    (F 21)(T 33)
5       1   12345       1                                                                (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)        
   
    Code:
    tableEnv.registerTableSource("a", new Order());
    tableEnv.registerTableSource("b", new OrderDetail());
    Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
    tableEnv.registerTable("ax", tbl1);
    Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
    tableEnv.registerTable("bx", tbl2);
    Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity) FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
    DataStream<Tuple2<Boolean, Row>> stream = tableEnv.toRetractStream(table, Row.class);
    stream.print();

    Result:
    (true,12345,10)
    (false,12345,10)
    (true,12345,21)
    (false,12345,21)
    (true,12345,33)
    (false,12345,33)
    (true,12345,21)
    (false,12345,21)
    (true,12345,10)
    (false,12345,10)
    (true,12345,12)
    (false,12345,12)
    (true,12345,23)
    (false,12345,23)
    (true,12345,33)


    I cann't understand why flink emit so many records at time 5?

    In production, we consume binlog stream from kafka, convert stream to flink table, after sql computation, convert result table to flink stream where we only
    preserve TRUE message in retract stream, and emit them to downstream kafka.

    Do we have some method to realize flink dynamic table really (I mean, trigger computation only once), when we receive (1,12345,1) from `order`, only emit (F 33)(T 33).

--
best wishes
hengyu


--
best wishes
hengyu