Hi everyone,
I am having some problems in the process of using flink sql, my sql is as follows: SELECT COUNT(DISTINCT mm.student_id),sum(price) FROM ( SELECT a.student_id,a.confirm_date_time,sum(a.real_total_price)as price FROM ( SELECT DISTINCT po.id as order_id ,po.student_id ,po.create_id ,po.paid_date_time ,po.confirm_date_time ,po.real_total_price from `pack_order` po left join user1 u on u.id=po.student_id WHERE po.status='PAY_CONFIRMED' AND po.real_total_price>500 AND po.confirm_date_time >= '2018-09-27' and po.confirm_date_time < '2018-09-28' AND po.type IN (1,2) AND u.id is null )a join pack_order1 por on a.student_id=por.student_id GROUP BY a.student_id,a.confirm_date_time HAVING MIN(por.confirm_date_time)=a.confirm_date_time )mm I registered 3 tables by receiving data from 3 kafka topics (each topic represents a table, each topic has only one partition), and then calculated by flink sql, the result is sent again to a kafka topic, The same partition I used, my final calculation is a count value and a sum value. According to my understanding, the final result of kafka should always be incremented, but the final result is as follows: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1671/%E5%9B%BE%E7%89%87.png> The result marked in the figure is our final result, and the last result is not, that is, the final calculation result is wrong. By observing the final result, we find that the data will increase first, then decrease, and then increase. This situation is unexplained. We need the value of count and sum to be incremented in kafka. Because our goal is to achieve real-time display, Can you please explain why this happens and how to avoid it? thank you very much best wishes! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
My final calculation result is implemented in the following way when writing
to kafka, because KafkaTableSink does not support retract mode, I am not sure whether this method will affect the calculation result. val userTest: Table = tEnv.sqlQuery(sql) val endStream = tEnv.toRetractStream[Row](userTest) //userTest.insertInto("kafkaSink") val myProducer = new FlinkKafkaProducer011[String]( kafkaBrokers, // broker list topic, // target topic new SimpleStringSchema) // serialization schema endStream.map(x=>{ s"${x._1}:${x._2.toString}" }).addSink(myProducer) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi clay, Are there any other lines after the last line in your picture? The final result should be eventual consistency and correct. In your sql, there is a left join, a keyed group by and a non-keyed group by. Both of the left join and keyed group by will send retractions to the downstream non-keyed group by once there is an update. The retraction messages vibrate the result value. However, the final result will be correct. To get monotonous results, you can add another non-keyed group by with max. Best, Hequn. On Sat, Sep 29, 2018 at 3:47 PM clay4444 <[hidden email]> wrote: My final calculation result is implemented in the following way when writing |
Hi,
you also need to keep the parallelism in mind. If your downstream operator or sink has a parallelism of 1 and your SQL query pipeline has a higher parallelism, the retract results are rebalanced and arrive in a wrong order. For example, if you view the changelog in SQL Client, the built-in SQL Client sink has always parallelism 1. Regards, Timo Am 29.09.18 um 17:02 schrieb Hequn Cheng:
|
In reply to this post by Hequn Cheng
hi,Hequn
I don't understand you about the group by and non-keyed group by. Can you explain it in a little more detail, or give me an example, thank u . clay, -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Timo Walther
hi,Timo
I use env.setParallelism(1) in my code, I set the overall degree of parallelism of the program to 1, so that some calculations will still be parallelized? clay, -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Clay, If you do env.setParallelism(1), the query won't be executed in parallel. However, looking at your screenshot the message order does not seem to be the problem here (given that you printed the content of the topic). Are you sure that it is not possible that the result decreases if some rows are added to one of the input tables? I don't have time to dig into your query, but the HAVING clause or the left join and (u.id is null) predicate look a bit suspicious to me. Would it be possible to create a minimal example that reproduces the issue? Best, Fabian Am Mo., 1. Okt. 2018 um 15:11 Uhr schrieb clay4444 <[hidden email]>: hi,Timo |
In reply to this post by clay4444
Hi clay, Keyed group by: SELECT a, SUM(b) as d Non Keyed group by: SELECT SUM(b) as d I would like to look into the problem. However, I can't find obvious problems from the sql. It would be great that can provide a minimal example to reproduce the issue. Also, use print sink to avoid sinking into multi kafka partitions, since it will also bring out of order problem. Best, Hequn On Mon, Oct 1, 2018 at 9:11 PM clay4444 <[hidden email]> wrote: hi,Timo |
Free forum by Nabble | Edit this page |