I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts.
class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)] Output of the program: current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1 |
Hi,
what is the input for each of those outputs? Could you maybe print this: System.out.println(in + “, current booking count "+value) Also, what is the key that you specify for your KeyedStream? Cheers, Aljoscha > On 23 Mar 2016, at 11:53, Balaji Rajagopalan <[hidden email]> wrote: > > I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts. > > class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)] > { > > > @transient var bookingCnt:ValueState[Int] = null > > override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = { > var value = bookingCnt.value() > value += 1 > System.out.println("current booking count "+value) > > bookingCnt.update(value) > out.collect(in) > } > > override def open( config:Configuration): Unit = { > val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt", > TypeInformation.of(new TypeHint[Int]() {}),0) > bookingCnt = getRuntimeContext().getState(descriptor); > > } > > > } > Output of the program: > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 > |
(Booking(te7uc4,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1ym,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(t9zvqw,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1e8,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdntcj,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1 The key is email id from the booking object. On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <[hidden email]> wrote: Hi, |
Never mind I understand what is going on Aljoscha for each unique key the value count is reset to 0. On Wed, Mar 23, 2016 at 4:37 PM, Balaji Rajagopalan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |