Valuestate is not saving the state

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Valuestate is not saving the state

Balaji Rajagopalan
I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts. 

class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
{


@transient var bookingCnt:ValueState[Int] = null

override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = {
var value = bookingCnt.value()
value += 1
System.out.println("current booking count "+value)

bookingCnt.update(value)
out.collect(in)
}

override def open( config:Configuration): Unit = {
val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt",
TypeInformation.of(new TypeHint[Int]() {}),0)
bookingCnt = getRuntimeContext().getState(descriptor);

}


}
Output of the program:
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
Reply | Threaded
Open this post in threaded view
|

Re: Valuestate is not saving the state

Aljoscha Krettek
Hi,
what is the input for each of those outputs? Could you maybe print this:

    System.out.println(in + “, current booking count "+value)

Also, what is the key that you specify for your KeyedStream?

Cheers,
Aljoscha

> On 23 Mar 2016, at 11:53, Balaji Rajagopalan <[hidden email]> wrote:
>
> I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts.
>
> class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
> {
>
>
>   @transient var bookingCnt:ValueState[Int] = null
>
>   override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = {
>     var value = bookingCnt.value()
>     value += 1
>     System.out.println("current booking count "+value)
>    
>     bookingCnt.update(value)
>      out.collect(in)
>   }
>
>   override def open( config:Configuration): Unit = {
>     val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt",
>       TypeInformation.of(new TypeHint[Int]() {}),0)
>     bookingCnt = getRuntimeContext().getState(descriptor);
>
>   }
>
>
> }
> Output of the program:
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
>

Reply | Threaded
Open this post in threaded view
|

Re: Valuestate is not saving the state

Balaji Rajagopalan
(Booking(te7uc4,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1ym,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(t9zvqw,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1e8,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdntcj,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1

The key is email id from the booking object. 


On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what is the input for each of those outputs? Could you maybe print this:

    System.out.println(in + “, current booking count "+value)

Also, what is the key that you specify for your KeyedStream?

Cheers,
Aljoscha
> On 23 Mar 2016, at 11:53, Balaji Rajagopalan <[hidden email]> wrote:
>
> I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts.
>
> class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
> {
>
>
>   @transient var bookingCnt:ValueState[Int] = null
>
>   override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = {
>     var value = bookingCnt.value()
>     value += 1
>     System.out.println("current booking count "+value)
>
>     bookingCnt.update(value)
>      out.collect(in)
>   }
>
>   override def open( config:Configuration): Unit = {
>     val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt",
>       TypeInformation.of(new TypeHint[Int]() {}),0)
>     bookingCnt = getRuntimeContext().getState(descriptor);
>
>   }
>
>
> }
> Output of the program:
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
>


Reply | Threaded
Open this post in threaded view
|

Re: Valuestate is not saving the state

Balaji Rajagopalan
Never mind I understand what is going on Aljoscha for each unique key the value count is reset to 0. 

On Wed, Mar 23, 2016 at 4:37 PM, Balaji Rajagopalan <[hidden email]> wrote:
(Booking(te7uc4,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1ym,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(t9zvqw,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1e8,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdntcj,compact,[hidden email],Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1wv,compact,[hidden email],Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1

The key is email id from the booking object. 


On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what is the input for each of those outputs? Could you maybe print this:

    System.out.println(in + “, current booking count "+value)

Also, what is the key that you specify for your KeyedStream?

Cheers,
Aljoscha
> On 23 Mar 2016, at 11:53, Balaji Rajagopalan <[hidden email]> wrote:
>
> I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts.
>
> class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
> {
>
>
>   @transient var bookingCnt:ValueState[Int] = null
>
>   override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = {
>     var value = bookingCnt.value()
>     value += 1
>     System.out.println("current booking count "+value)
>
>     bookingCnt.update(value)
>      out.collect(in)
>   }
>
>   override def open( config:Configuration): Unit = {
>     val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt",
>       TypeInformation.of(new TypeHint[Int]() {}),0)
>     bookingCnt = getRuntimeContext().getState(descriptor);
>
>   }
>
>
> }
> Output of the program:
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
>