Valuestate is not saving the state

Posted by Balaji Rajagopalan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Valuestate-is-not-saving-the-state-tp5732.html

I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts. 

class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
{


@transient var bookingCnt:ValueState[Int] = null

override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = {
var value = bookingCnt.value()
value += 1
System.out.println("current booking count "+value)

bookingCnt.update(value)
out.collect(in)
}

override def open( config:Configuration): Unit = {
val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt",
TypeInformation.of(new TypeHint[Int]() {}),0)
bookingCnt = getRuntimeContext().getState(descriptor);

}


}
Output of the program:
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1