Hi clay,
Are there any other lines after the last line in your picture? The final result should be eventual consistency and correct.
In your sql, there is a left join, a keyed group by and a non-keyed group by. Both of the left join and keyed group by will send retractions to the downstream non-keyed group by once there is an update. The retraction messages vibrate the result value. However, the final result will be correct.To get monotonous results, you can add another non-keyed group by with max.
Best, Hequn.
On Sat, Sep 29, 2018 at 3:47 PM clay4444 <[hidden email]> wrote:
My final calculation result is implemented in the following way when writing
to kafka, because KafkaTableSink does not support retract mode, I am not
sure whether this method will affect the calculation result.
val userTest: Table = tEnv.sqlQuery(sql)
val endStream = tEnv.toRetractStream[Row](userTest)
//userTest.insertInto("kafkaSink")
val myProducer = new FlinkKafkaProducer011[String](
kafkaBrokers, // broker list
topic, // target topic
new SimpleStringSchema) // serialization schema
endStream.map(x=>{
s"${x._1}:${x._2.toString}"
}).addSink(myProducer)
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Free forum by Nabble | Edit this page |