About the retract of the calculation result of flink sql

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

About the retract of the calculation result of flink sql

clay4444
Hi everyone,

I am having some problems in the process of using flink sql, my sql is as
follows:

SELECT COUNT(DISTINCT mm.student_id),sum(price)
FROM (
SELECT a.student_id,a.confirm_date_time,sum(a.real_total_price)as price
FROM (
SELECT DISTINCT po.id as order_id
,po.student_id
,po.create_id
,po.paid_date_time
,po.confirm_date_time
,po.real_total_price
from `pack_order` po
left join user1 u on u.id=po.student_id
WHERE po.status='PAY_CONFIRMED'
AND po.real_total_price>500
AND po.confirm_date_time >= '2018-09-27' and po.confirm_date_time <
'2018-09-28'
AND po.type IN (1,2)
AND u.id is null
)a
join pack_order1 por on a.student_id=por.student_id
GROUP BY a.student_id,a.confirm_date_time
HAVING MIN(por.confirm_date_time)=a.confirm_date_time
)mm


I registered 3 tables by receiving data from 3 kafka topics (each topic
represents a table, each topic has only one partition), and then calculated
by flink sql, the result is sent again to a kafka topic, The same partition
I used, my final calculation is a count value and a sum value. According to
my understanding, the final result of kafka should always be incremented,
but the final result is as follows:

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1671/%E5%9B%BE%E7%89%87.png>

The result marked in the figure is our final result, and the last result is
not, that is, the final calculation result is wrong.

By observing the final result, we find that the data will increase first,
then decrease, and then increase. This situation is unexplained. We need the
value of count and sum to be incremented in kafka. Because our goal is to
achieve real-time display,

Can you please explain why this happens and how to avoid it?
thank you very much


best wishes!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: About the retract of the calculation result of flink sql

clay4444
My final calculation result is implemented in the following way when writing
to kafka, because KafkaTableSink does not support retract mode, I am not
sure whether this method will affect the calculation result.

val userTest: Table = tEnv.sqlQuery(sql)

val endStream = tEnv.toRetractStream[Row](userTest)

//userTest.insertInto("kafkaSink")

val myProducer = new FlinkKafkaProducer011[String](
  kafkaBrokers,         // broker list
  topic,   // target topic
  new SimpleStringSchema)   // serialization schema

endStream.map(x=>{
  s"${x._1}:${x._2.toString}"
}).addSink(myProducer)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: About the retract of the calculation result of flink sql

Hequn Cheng
Hi clay,

Are there any other lines after the last line in your picture? The final result should be eventual consistency and correct.

In your sql, there is a left join, a keyed group by and a non-keyed group by. Both of the left join and keyed group by will send retractions to the downstream non-keyed group by once there is an update. The retraction messages vibrate the result value. However, the final result will be correct.
To get monotonous results, you can add another non-keyed group by with max.

Best, Hequn.


On Sat, Sep 29, 2018 at 3:47 PM clay4444 <[hidden email]> wrote:
My final calculation result is implemented in the following way when writing
to kafka, because KafkaTableSink does not support retract mode, I am not
sure whether this method will affect the calculation result.

val userTest: Table = tEnv.sqlQuery(sql)

val endStream = tEnv.toRetractStream[Row](userTest)

//userTest.insertInto("kafkaSink")

val myProducer = new FlinkKafkaProducer011[String](
  kafkaBrokers,         // broker list
  topic,   // target topic
  new SimpleStringSchema)   // serialization schema

endStream.map(x=>{
  s"${x._1}:${x._2.toString}"
}).addSink(myProducer)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: About the retract of the calculation result of flink sql

Timo Walther
Hi,

you also need to keep the parallelism in mind. If your downstream operator or sink has a parallelism of 1 and your SQL query pipeline has a higher parallelism, the retract results are rebalanced and arrive in a wrong order. For example, if you view the changelog in SQL Client, the built-in SQL Client sink has always parallelism 1.

Regards,
Timo



Am 29.09.18 um 17:02 schrieb Hequn Cheng:
Hi clay,

Are there any other lines after the last line in your picture? The final result should be eventual consistency and correct.

In your sql, there is a left join, a keyed group by and a non-keyed group by. Both of the left join and keyed group by will send retractions to the downstream non-keyed group by once there is an update. The retraction messages vibrate the result value. However, the final result will be correct.
To get monotonous results, you can add another non-keyed group by with max.

Best, Hequn.


On Sat, Sep 29, 2018 at 3:47 PM clay4444 <[hidden email]> wrote:
My final calculation result is implemented in the following way when writing
to kafka, because KafkaTableSink does not support retract mode, I am not
sure whether this method will affect the calculation result.

val userTest: Table = tEnv.sqlQuery(sql)

val endStream = tEnv.toRetractStream[Row](userTest)

//userTest.insertInto("kafkaSink")

val myProducer = new FlinkKafkaProducer011[String](
  kafkaBrokers,         // broker list
  topic,   // target topic
  new SimpleStringSchema)   // serialization schema

endStream.map(x=>{
  s"${x._1}:${x._2.toString}"
}).addSink(myProducer)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: About the retract of the calculation result of flink sql

clay4444
In reply to this post by Hequn Cheng
hi,Hequn

I don't understand you about the group by and non-keyed group by. Can you
explain it in a little more detail, or give me an example, thank u .

clay,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: About the retract of the calculation result of flink sql

clay4444
In reply to this post by Timo Walther
hi,Timo

I use env.setParallelism(1) in my code, I set the overall degree of
parallelism of the program to 1, so that some calculations will still be
parallelized?

clay,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: About the retract of the calculation result of flink sql

Fabian Hueske-2
Hi Clay,

If you do env.setParallelism(1), the query won't be executed in parallel.
However, looking at your screenshot the message order does not seem to be the problem here (given that you printed the content of the topic).

Are you sure that it is not possible that the result decreases if some rows are added to one of the input tables?
I don't have time to dig into your query, but the HAVING clause or the left join and (u.id is null) predicate look a bit suspicious to me.

Would it be possible to create a minimal example that reproduces the issue?

Best, Fabian

Am Mo., 1. Okt. 2018 um 15:11 Uhr schrieb clay4444 <[hidden email]>:
hi,Timo

I use env.setParallelism(1) in my code, I set the overall degree of
parallelism of the program to 1, so that some calculations will still be
parallelized?

clay,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: About the retract of the calculation result of flink sql

Hequn Cheng
In reply to this post by clay4444
Hi clay,

Keyed group by: 
SELECT a, SUM(b) as d
FROM Orders
GROUP BY a
 
Non Keyed group by:
SELECT SUM(b) as d
FROM Orders

I would like to look into the problem. However, I can't find obvious problems from the sql. It would be great that can provide a minimal example to reproduce the issue. Also, use print sink to avoid sinking into multi kafka partitions, since it will also bring out of order problem.

Best, Hequn

On Mon, Oct 1, 2018 at 9:11 PM clay4444 <[hidden email]> wrote:
hi,Timo

I use env.setParallelism(1) in my code, I set the overall degree of
parallelism of the program to 1, so that some calculations will still be
parallelized?

clay,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/