|
Hi all!
I need to compute accumulated value of 1 minute each 3 seconds, in other words, the output should be 3 seconds, and the content is the current 1 minute.
For accumulated value, I use a Value State to maintain previous value, but for some reason such as backPressure, the Value state is disordered after a while.
My code is below, anyone knows how to improve or solve it?
--
val watermarkTransaction = transactionDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction] {
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 60000L
override def getCurrentWatermark: Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
override def extractTimestamp(t: StockTransaction, l: Long): Long = {
val timestamp = t.time
currentMaxTimestamp = Math.max(timestamp, l)
timestamp
}
})
val tx = watermarkTransaction.filter(f => {f.lastPrice != 0.0 && f.turnover != 0.0} )
.keyBy(_.code)
.flatMap(new TXRichFlatMapFunction)
val txOut = tx.keyBy(_.code).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new TXWindowApply1Minute)
txOut.addSink(new TX1MinuteSinkToOracle)
|