Dear flink developers&users
I have a question about flink sql, It gives me a lot of trouble, Thank you very much for some help. Lets's assume we have two data stream, `order` and `order_detail`, they are from mysql binlog. Table `order` schema: id int primary key order_id int status int Table `order_detail` schema: id int primary key order_id int quantity int order : order_detail = 1:N, they are joined by `order_id` think we have following data sequence, and we compute sum(quantity) group by order.oreder_id after they are joined time order order__detail result id order_id status id order_id quantity 1 1 12345 0 2 1 12345 10 (T 10) 3 2 12345 11 (F 10)(T 21) 4 3 12345 12 (F 21)(T 33) 5 1 12345 1 (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33) Code: tableEnv.registerTableSource("a", new Order()); tableEnv.registerTableSource("b", new OrderDetail()); Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(status) AS status FROM a GROUP BY id"); tableEnv.registerTable("ax", tbl1); Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id"); tableEnv.registerTable("bx", tbl2); Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity) FROM ax JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id"); DataStream<Tuple2<Boolean, Row>> stream = tableEnv.toRetractStream(table, Row.class); stream.print(); Result: (true,12345,10) (false,12345,10) (true,12345,21) (false,12345,21) (true,12345,33) (false,12345,33) (true,12345,21) (false,12345,21) (true,12345,10) (false,12345,10) (true,12345,12) (false,12345,12) (true,12345,23) (false,12345,23) (true,12345,33) I cann't understand why flink emit so many records at time 5? In production, we consume binlog stream from kafka, convert stream to flink table, after sql computation, convert result table to flink stream where we only preserve TRUE message in retract stream, and emit them to downstream kafka. Do we have some method to realize flink dynamic table really (I mean, trigger computation only once), when we receive (1,12345,1) from `order`, only emit (F 33)(T 33). best wishes hengyu |
Free forum by Nabble | Edit this page |