(Booking(te7uc4,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1(Booking(tdr1ym,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1(Booking(t9zvqw,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1(Booking(tdr1e8,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1(Booking(tdntcj,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1(Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1(Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1The key is email id from the booking object.On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <[hidden email]> wrote:Hi,
what is the input for each of those outputs? Could you maybe print this:
System.out.println(in + “, current booking count "+value)
Also, what is the key that you specify for your KeyedStream?
Cheers,
Aljoscha
> On 23 Mar 2016, at 11:53, Balaji Rajagopalan <[hidden email]> wrote:
>
> I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts.
>
> class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
> {
>
>
> @transient var bookingCnt:ValueState[Int] = null
>
> override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = {
> var value = bookingCnt.value()
> value += 1
> System.out.println("current booking count "+value)
>
> bookingCnt.update(value)
> out.collect(in)
> }
>
> override def open( config:Configuration): Unit = {
> val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt",
> TypeInformation.of(new TypeHint[Int]() {}),0)
> bookingCnt = getRuntimeContext().getState(descriptor);
>
> }
>
>
> }
> Output of the program:
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
>
Free forum by Nabble | Edit this page |