DataStream value state and backPressure problem

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

DataStream value state and backPressure problem

LiZhe
Hi all!

I need to compute accumulated value of 1 minute  each 3 seconds, in other words, the output should be 3 seconds, and the content is the current 1 minute.

For accumulated value, I use a Value State to maintain previous value, but for some reason such as backPressure, the Value state is  disordered after a while.

My code is below, anyone knows how to improve or solve it?
--
val watermarkTransaction = transactionDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 60000L

      override def getCurrentWatermark: Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      }

      override def extractTimestamp(t: StockTransaction, l: Long): Long = {
        val timestamp = t.time
        currentMaxTimestamp = Math.max(timestamp, l)
        timestamp
      }
    })


val tx = watermarkTransaction.filter(f => {f.lastPrice != 0.0 && f.turnover != 0.0} )
      .keyBy(_.code)
      .flatMap(new TXRichFlatMapFunction)

    val txOut = tx.keyBy(_.code).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new TXWindowApply1Minute)

    txOut.addSink(new TX1MinuteSinkToOracle)