Queryable State

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable State

Dawid Wysakowicz
Hi I was experimenting with the Query State feature and I have some problems querying the state.

The code which I use to produce the queryable state is:

    env.addSource(kafkaConsumer).map(
      e => e match {
        case LoginClickEvent(_, t) => ("login", 1, t)
        case LogoutClickEvent(_, t) => ("logout", 1, t)
        case ButtonClickEvent(_, _, t) => ("button", 1, t)
      }).keyBy(0).timeWindow(Time.seconds(1))
      .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
      .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
      .keyBy("key")
      .asQueryableState(
        "type-time-series-count",
        new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
          "type-time-series-count",
          classOf[KeyedDataPoint[java.lang.Integer]]))

As you see it is a rather simple job, in which I try to count events of different types in windows and then query by event type.

In client code I do:
    // Query Flink state
    val future = client.getKvState(jobId, "type-time-series-count", key.hashCode, seralizedKey)

    // Await async result
    val serializedResult: Array[Byte] = Await.result(
      future, new FiniteDuration(
        10,
        duration.SECONDS))

    // Deserialize response
    val results = deserializeResponse(serializedResult)

    results
  }

  private def deserializeResponse(serializedResult: Array[Byte]): util.List[KeyedDataPoint[lang
  .Integer]] = {
    KvStateRequestSerializer.deserializeList(serializedResult, getValueSerializer())
  }

As I was trying to debug the issue I see the first element in list gets deserialized correctly, but it fails on the second one. It seems like the serialized result is broken. Do you have any idea if I am doing sth wrong or there is some bug?


The exception I get is:
java.io.EOFException: null
at org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157)
at org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386)
at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487)
at com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44)

You can browse the exact code at: https://github.com/dawidwys/flink-intro

I would be grateful for any advice.

Regards
Dawid Wysakowicz
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Ufuk Celebi
Hey Dawid! Thanks for reporting this. I will try to have a look over
the course of the day. From a first impression, this seems like a bug
to me.

On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
<[hidden email]> wrote:

> Hi I was experimenting with the Query State feature and I have some problems
> querying the state.
>
> The code which I use to produce the queryable state is:
>
>     env.addSource(kafkaConsumer).map(
>       e => e match {
>         case LoginClickEvent(_, t) => ("login", 1, t)
>         case LogoutClickEvent(_, t) => ("logout", 1, t)
>         case ButtonClickEvent(_, _, t) => ("button", 1, t)
>       }).keyBy(0).timeWindow(Time.seconds(1))
>       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
>       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
>       .keyBy("key")
>       .asQueryableState(
>         "type-time-series-count",
>         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>           "type-time-series-count",
>           classOf[KeyedDataPoint[java.lang.Integer]]))
>
> As you see it is a rather simple job, in which I try to count events of
> different types in windows and then query by event type.
>
> In client code I do:
>     // Query Flink state
>     val future = client.getKvState(jobId, "type-time-series-count",
> key.hashCode, seralizedKey)
>
>     // Await async result
>     val serializedResult: Array[Byte] = Await.result(
>       future, new FiniteDuration(
>         10,
>         duration.SECONDS))
>
>     // Deserialize response
>     val results = deserializeResponse(serializedResult)
>
>     results
>   }
>
>   private def deserializeResponse(serializedResult: Array[Byte]):
> util.List[KeyedDataPoint[lang
>   .Integer]] = {
>     KvStateRequestSerializer.deserializeList(serializedResult,
> getValueSerializer())
>   }
>
> As I was trying to debug the issue I see the first element in list gets
> deserialized correctly, but it fails on the second one. It seems like the
> serialized result is broken. Do you have any idea if I am doing sth wrong or
> there is some bug?
>
>
> The exception I get is:
> java.io.EOFException: null
> at
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157)
> at
> org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386)
> at
> org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487)
> at
> com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44)
>
> You can browse the exact code at: https://github.com/dawidwys/flink-intro
>
> I would be grateful for any advice.
>
> Regards
> Dawid Wysakowicz
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Dawid Wysakowicz
Hey Ufuk.
Did you maybe had a while to have a look at that problem?

2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
Hey Dawid! Thanks for reporting this. I will try to have a look over
the course of the day. From a first impression, this seems like a bug
to me.

On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
<[hidden email]> wrote:
> Hi I was experimenting with the Query State feature and I have some problems
> querying the state.
>
> The code which I use to produce the queryable state is:
>
>     env.addSource(kafkaConsumer).map(
>       e => e match {
>         case LoginClickEvent(_, t) => ("login", 1, t)
>         case LogoutClickEvent(_, t) => ("logout", 1, t)
>         case ButtonClickEvent(_, _, t) => ("button", 1, t)
>       }).keyBy(0).timeWindow(Time.seconds(1))
>       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
>       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
>       .keyBy("key")
>       .asQueryableState(
>         "type-time-series-count",
>         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>           "type-time-series-count",
>           classOf[KeyedDataPoint[java.lang.Integer]]))
>
> As you see it is a rather simple job, in which I try to count events of
> different types in windows and then query by event type.
>
> In client code I do:
>     // Query Flink state
>     val future = client.getKvState(jobId, "type-time-series-count",
> key.hashCode, seralizedKey)
>
>     // Await async result
>     val serializedResult: Array[Byte] = Await.result(
>       future, new FiniteDuration(
>         10,
>         duration.SECONDS))
>
>     // Deserialize response
>     val results = deserializeResponse(serializedResult)
>
>     results
>   }
>
>   private def deserializeResponse(serializedResult: Array[Byte]):
> util.List[KeyedDataPoint[lang
>   .Integer]] = {
>     KvStateRequestSerializer.deserializeList(serializedResult,
> getValueSerializer())
>   }
>
> As I was trying to debug the issue I see the first element in list gets
> deserialized correctly, but it fails on the second one. It seems like the
> serialized result is broken. Do you have any idea if I am doing sth wrong or
> there is some bug?
>
>
> The exception I get is:
> java.io.EOFException: null
> at
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157)
> at
> org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386)
> at
> org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487)
> at
> com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44)
>
> You can browse the exact code at: https://github.com/dawidwys/flink-intro
>
> I would be grateful for any advice.
>
> Regards
> Dawid Wysakowicz

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Nico Kruber
Hi Dawid,
I'll try to reproduce the error in the next couple of days. Can you also share
the value deserializer you use? Also, have you tried even smaller examples in
the meantime? Did they work?

As a side-note in general regarding the queryable state "sink" using ListState
(".asQueryableState(<name>, ListStateDescriptor)"): everything that enters
this operator will be stored forever and never cleaned. Eventually, it will
pile up too much memory and is thus of limited use. Maybe it should even be
removed from the API.


Nico

On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:

> Hey Ufuk.
> Did you maybe had a while to have a look at that problem?
>
> 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
> > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > the course of the day. From a first impression, this seems like a bug
> > to me.
> >
> > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> >
> > <[hidden email]> wrote:
> > > Hi I was experimenting with the Query State feature and I have some
> >
> > problems
> >
> > > querying the state.
> > >
> > > The code which I use to produce the queryable state is:
> > >     env.addSource(kafkaConsumer).map(
> > >    
> > >       e => e match {
> > >      
> > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > >      
> > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > >       e2._3)))
> > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> > >       .keyBy("key")
> > >       .asQueryableState(
> > >      
> > >         "type-time-series-count",
> > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > >        
> > >           "type-time-series-count",
> > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > >
> > > As you see it is a rather simple job, in which I try to count events of
> > > different types in windows and then query by event type.
> > >
> > > In client code I do:
> > >     // Query Flink state
> > >     val future = client.getKvState(jobId, "type-time-series-count",
> > >
> > > key.hashCode, seralizedKey)
> > >
> > >     // Await async result
> > >     val serializedResult: Array[Byte] = Await.result(
> > >    
> > >       future, new FiniteDuration(
> > >      
> > >         10,
> > >         duration.SECONDS))
> > >    
> > >     // Deserialize response
> > >     val results = deserializeResponse(serializedResult)
> > >    
> > >     results
> > >  
> > >   }
> > >
> > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > util.List[KeyedDataPoint[lang
> > >
> > >   .Integer]] = {
> > >  
> > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > >
> > > getValueSerializer())
> > >
> > >   }
> > >
> > > As I was trying to debug the issue I see the first element in list gets
> > > deserialized correctly, but it fails on the second one. It seems like
> > > the
> > > serialized result is broken. Do you have any idea if I am doing sth
> >
> > wrong or
> >
> > > there is some bug?
> > >
> > >
> > > The exception I get is:
> > > java.io.EOFException: null
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> >
> > DataInputDeserializer.java:157)
> >
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> >
> > DataInputDeserializer.java:240)
> >
> > > at
> > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> >
> > PojoSerializer.java:386)
> >
> > > at
> > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> >
> > deserializeList(KvStateRequestSerializer.java:487)
> >
> > > at
> > > com.dataartisans.stateserver.queryclient.QueryClient.
> >
> > deserializeResponse(QueryClient.scala:44)
> >
> > > You can browse the exact code at: https://github.com/dawidwys/
> >
> > flink-intro
> >
> > > I would be grateful for any advice.
> > >
> > > Regards
> > > Dawid Wysakowicz

signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Dawid Wysakowicz
Hi Nico,

Recently I've tried the queryable state a bit differently, by using ValueState with a value of a util.ArrayList and a ValueSerializer for util.ArrayList and it works as expected.

(The QueryableJob is in module flink-queryable-job and the QueryClient in flink-state-server)

Sure, I am aware of the downfall of the ListState. I need it just for presentational purpose, but you may be right there might not be any production use for this state and it should be removed.
Maybe the problem is just with the ListState and removing it would resolve also my problem :)

Regards
Dawid Wysakowicz


2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>:
Hi Dawid,
I'll try to reproduce the error in the next couple of days. Can you also share
the value deserializer you use? Also, have you tried even smaller examples in
the meantime? Did they work?

As a side-note in general regarding the queryable state "sink" using ListState
(".asQueryableState(<name>, ListStateDescriptor)"): everything that enters
this operator will be stored forever and never cleaned. Eventually, it will
pile up too much memory and is thus of limited use. Maybe it should even be
removed from the API.


Nico

On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> Hey Ufuk.
> Did you maybe had a while to have a look at that problem?
>
> 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
> > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > the course of the day. From a first impression, this seems like a bug
> > to me.
> >
> > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> >
> > <[hidden email]> wrote:
> > > Hi I was experimenting with the Query State feature and I have some
> >
> > problems
> >
> > > querying the state.
> > >
> > > The code which I use to produce the queryable state is:
> > >     env.addSource(kafkaConsumer).map(
> > >
> > >       e => e match {
> > >
> > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > >
> > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > >       e2._3)))
> > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> > >       .keyBy("key")
> > >       .asQueryableState(
> > >
> > >         "type-time-series-count",
> > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > >
> > >           "type-time-series-count",
> > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > >
> > > As you see it is a rather simple job, in which I try to count events of
> > > different types in windows and then query by event type.
> > >
> > > In client code I do:
> > >     // Query Flink state
> > >     val future = client.getKvState(jobId, "type-time-series-count",
> > >
> > > key.hashCode, seralizedKey)
> > >
> > >     // Await async result
> > >     val serializedResult: Array[Byte] = Await.result(
> > >
> > >       future, new FiniteDuration(
> > >
> > >         10,
> > >         duration.SECONDS))
> > >
> > >     // Deserialize response
> > >     val results = deserializeResponse(serializedResult)
> > >
> > >     results
> > >
> > >   }
> > >
> > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > util.List[KeyedDataPoint[lang
> > >
> > >   .Integer]] = {
> > >
> > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > >
> > > getValueSerializer())
> > >
> > >   }
> > >
> > > As I was trying to debug the issue I see the first element in list gets
> > > deserialized correctly, but it fails on the second one. It seems like
> > > the
> > > serialized result is broken. Do you have any idea if I am doing sth
> >
> > wrong or
> >
> > > there is some bug?
> > >
> > >
> > > The exception I get is:
> > > java.io.EOFException: null
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> >
> > DataInputDeserializer.java:157)
> >
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> >
> > DataInputDeserializer.java:240)
> >
> > > at
> > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> >
> > PojoSerializer.java:386)
> >
> > > at
> > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> >
> > deserializeList(KvStateRequestSerializer.java:487)
> >
> > > at
> > > com.dataartisans.stateserver.queryclient.QueryClient.
> >
> > deserializeResponse(QueryClient.scala:44)
> >
> > > You can browse the exact code at: https://github.com/dawidwys/
> >
> > flink-intro
> >
> > > I would be grateful for any advice.
> > >
> > > Regards
> > > Dawid Wysakowicz

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Ufuk Celebi
Hey Dawid! I talked offline with Nico last week and he took this over.
He also suggested to remove the list queryable state variant
altogether which makes a lot of sense to me (at least with the current
state of things). @Nico: could you open an issue for it?

Nico also found a difference in your code in the way you get the type
information in the job vs. your code (class of vs. manual type
information), which could be the root of the initial problem.

– Ufuk


On Sat, Jan 14, 2017 at 2:03 PM, Dawid Wysakowicz
<[hidden email]> wrote:

> Hi Nico,
>
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
>
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219
> The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
>
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
>
> Regards
> Dawid Wysakowicz
>
>
> 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>:
>>
>> Hi Dawid,
>> I'll try to reproduce the error in the next couple of days. Can you also
>> share
>> the value deserializer you use? Also, have you tried even smaller examples
>> in
>> the meantime? Did they work?
>>
>> As a side-note in general regarding the queryable state "sink" using
>> ListState
>> (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters
>> this operator will be stored forever and never cleaned. Eventually, it
>> will
>> pile up too much memory and is thus of limited use. Maybe it should even
>> be
>> removed from the API.
>>
>>
>> Nico
>>
>> On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
>> > Hey Ufuk.
>> > Did you maybe had a while to have a look at that problem?
>> >
>> > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> > > Hey Dawid! Thanks for reporting this. I will try to have a look over
>> > > the course of the day. From a first impression, this seems like a bug
>> > > to me.
>> > >
>> > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
>> > >
>> > > <[hidden email]> wrote:
>> > > > Hi I was experimenting with the Query State feature and I have some
>> > >
>> > > problems
>> > >
>> > > > querying the state.
>> > > >
>> > > > The code which I use to produce the queryable state is:
>> > > >     env.addSource(kafkaConsumer).map(
>> > > >
>> > > >       e => e match {
>> > > >
>> > > >         case LoginClickEvent(_, t) => ("login", 1, t)
>> > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
>> > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
>> > > >
>> > > >       }).keyBy(0).timeWindow(Time.seconds(1))
>> > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
>> > > >       e2._3)))
>> > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
>> > > > e._2))
>> > > >       .keyBy("key")
>> > > >       .asQueryableState(
>> > > >
>> > > >         "type-time-series-count",
>> > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>> > > >
>> > > >           "type-time-series-count",
>> > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
>> > > >
>> > > > As you see it is a rather simple job, in which I try to count events
>> > > > of
>> > > > different types in windows and then query by event type.
>> > > >
>> > > > In client code I do:
>> > > >     // Query Flink state
>> > > >     val future = client.getKvState(jobId, "type-time-series-count",
>> > > >
>> > > > key.hashCode, seralizedKey)
>> > > >
>> > > >     // Await async result
>> > > >     val serializedResult: Array[Byte] = Await.result(
>> > > >
>> > > >       future, new FiniteDuration(
>> > > >
>> > > >         10,
>> > > >         duration.SECONDS))
>> > > >
>> > > >     // Deserialize response
>> > > >     val results = deserializeResponse(serializedResult)
>> > > >
>> > > >     results
>> > > >
>> > > >   }
>> > > >
>> > > >   private def deserializeResponse(serializedResult: Array[Byte]):
>> > > > util.List[KeyedDataPoint[lang
>> > > >
>> > > >   .Integer]] = {
>> > > >
>> > > >     KvStateRequestSerializer.deserializeList(serializedResult,
>> > > >
>> > > > getValueSerializer())
>> > > >
>> > > >   }
>> > > >
>> > > > As I was trying to debug the issue I see the first element in list
>> > > > gets
>> > > > deserialized correctly, but it fails on the second one. It seems
>> > > > like
>> > > > the
>> > > > serialized result is broken. Do you have any idea if I am doing sth
>> > >
>> > > wrong or
>> > >
>> > > > there is some bug?
>> > > >
>> > > >
>> > > > The exception I get is:
>> > > > java.io.EOFException: null
>> > > > at
>> > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
>> > >
>> > > DataInputDeserializer.java:157)
>> > >
>> > > > at
>> > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
>> > >
>> > > DataInputDeserializer.java:240)
>> > >
>> > > > at
>> > > >
>> > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
>> > >
>> > > PojoSerializer.java:386)
>> > >
>> > > > at
>> > > >
>> > > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
>> > >
>> > > deserializeList(KvStateRequestSerializer.java:487)
>> > >
>> > > > at
>> > > > com.dataartisans.stateserver.queryclient.QueryClient.
>> > >
>> > > deserializeResponse(QueryClient.scala:44)
>> > >
>> > > > You can browse the exact code at: https://github.com/dawidwys/
>> > >
>> > > flink-intro
>> > >
>> > > > I would be grateful for any advice.
>> > > >
>> > > > Regards
>> > > > Dawid Wysakowicz
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Nico Kruber
In reply to this post by Dawid Wysakowicz
Hi Dawid,
regarding the original code, I couldn't reproduce this with the Java code I
wrote and my guess is that the second parameter of the ListStateDescriptor is
wrong:

      .asQueryableState(
        "type-time-series-count",
        new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
          "type-time-series-count",
          classOf[KeyedDataPoint[java.lang.Integer]]))

this should rather be

TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}

as in the query itself. It sounds strange to me that you don't get ant
ClassCastException or a compile-time error due to the type being wrong but I
lack some Scala knowledge to get to the ground of this.


Regarding the removal of the queryable list state "sink", I created a JIRA
issue for it and will open a PR:
https://issues.apache.org/jira/browse/FLINK-5507


Nico

On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:

> Hi Nico,
>
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
>
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a7
> 0a6fe2219 The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
>
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
>
> Regards
> Dawid Wysakowicz
>
> 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>:
> > Hi Dawid,
> > I'll try to reproduce the error in the next couple of days. Can you also
> > share
> > the value deserializer you use? Also, have you tried even smaller examples
> > in
> > the meantime? Did they work?
> >
> > As a side-note in general regarding the queryable state "sink" using
> > ListState
> > (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters
> > this operator will be stored forever and never cleaned. Eventually, it
> > will
> > pile up too much memory and is thus of limited use. Maybe it should even
> > be
> > removed from the API.
> >
> >
> > Nico
> >
> > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > Hey Ufuk.
> > > Did you maybe had a while to have a look at that problem?
> > >
> > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
> > > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > > the course of the day. From a first impression, this seems like a bug
> > > > to me.
> > > >
> > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > >
> > > > <[hidden email]> wrote:
> > > > > Hi I was experimenting with the Query State feature and I have some
> > > >
> > > > problems
> > > >
> > > > > querying the state.
> > > > >
> > > > > The code which I use to produce the queryable state is:
> > > > >     env.addSource(kafkaConsumer).map(
> > > > >    
> > > > >       e => e match {
> > > > >      
> > > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > >      
> > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > >       e2._3)))
> > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> >
> > e._2))
> >
> > > > >       .keyBy("key")
> > > > >       .asQueryableState(
> > > > >      
> > > > >         "type-time-series-count",
> > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > > >        
> > > > >           "type-time-series-count",
> > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > >
> > > > > As you see it is a rather simple job, in which I try to count events
> >
> > of
> >
> > > > > different types in windows and then query by event type.
> > > > >
> > > > > In client code I do:
> > > > >     // Query Flink state
> > > > >     val future = client.getKvState(jobId, "type-time-series-count",
> > > > >
> > > > > key.hashCode, seralizedKey)
> > > > >
> > > > >     // Await async result
> > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > >    
> > > > >       future, new FiniteDuration(
> > > > >      
> > > > >         10,
> > > > >         duration.SECONDS))
> > > > >    
> > > > >     // Deserialize response
> > > > >     val results = deserializeResponse(serializedResult)
> > > > >    
> > > > >     results
> > > > >  
> > > > >   }
> > > > >
> > > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > > util.List[KeyedDataPoint[lang
> > > > >
> > > > >   .Integer]] = {
> > > > >  
> > > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > > >
> > > > > getValueSerializer())
> > > > >
> > > > >   }
> > > > >
> > > > > As I was trying to debug the issue I see the first element in list
> >
> > gets
> >
> > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > like
> > > > > the
> > > > > serialized result is broken. Do you have any idea if I am doing sth
> > > >
> > > > wrong or
> > > >
> > > > > there is some bug?
> > > > >
> > > > >
> > > > > The exception I get is:
> > > > > java.io.EOFException: null
> > > > > at
> > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > >
> > > > DataInputDeserializer.java:157)
> > > >
> > > > > at
> > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > >
> > > > DataInputDeserializer.java:240)
> > > >
> > > > > at
> > > > > org.apache.flink.api.java.typeutils.runtime.
> >
> > PojoSerializer.deserialize(
> >
> > > > PojoSerializer.java:386)
> > > >
> > > > > at
> > > > > org.apache.flink.runtime.query.netty.message.
> >
> > KvStateRequestSerializer.
> >
> > > > deserializeList(KvStateRequestSerializer.java:487)
> > > >
> > > > > at
> > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > >
> > > > deserializeResponse(QueryClient.scala:44)
> > > >
> > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > >
> > > > flink-intro
> > > >
> > > > > I would be grateful for any advice.
> > > > >
> > > > > Regards
> > > > > Dawid Wysakowicz

signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Dawid Wysakowicz
Hi Nico, Ufuk,

Thanks for diving into this issue.

@Nico

I don't think that's the problem. The code can be exactly reproduced in java. I am using other constructor for ListDescriptor than you did:

You used: 
public ListStateDescriptor(String name, TypeInformation<T> typeInfo)
 
While I used:
 public ListStateDescriptor(String name, Class<T> typeClass)

I think the problem is with the way I deserialized the value on the QueryClient side as I tried to use:
 
KvStateRequestSerializer.deserializeList(serializedResult, {
      TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})
        .createSerializer(new ExecutionConfig)
    })

I have not checked it, but now I suspect this code would work:
KvStateRequestSerializer.deserializeValue(serializedResult, {
      TypeInformation.of(new TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
        .createSerializer(new ExecutionConfig)
    })

Regarding removing the queryable state list I agree, using it seems pointless. Moreover while removing it I would take a second look at those functions:
KvStateRequestSerializer::deserializeList
 KvStateRequestSerializer.serializeList
 
As I think they are not used at all even right now. Thanks for your time.

Regards
Dawid Wysakowicz

2017-01-16 13:25 GMT+01:00 Nico Kruber <[hidden email]>:
Hi Dawid,
regarding the original code, I couldn't reproduce this with the Java code I
wrote and my guess is that the second parameter of the ListStateDescriptor is
wrong:

      .asQueryableState(
        "type-time-series-count",
        new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
          "type-time-series-count",
          classOf[KeyedDataPoint[java.lang.Integer]]))

this should rather be

TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}

as in the query itself. It sounds strange to me that you don't get ant
ClassCastException or a compile-time error due to the type being wrong but I
lack some Scala knowledge to get to the ground of this.


Regarding the removal of the queryable list state "sink", I created a JIRA
issue for it and will open a PR:
https://issues.apache.org/jira/browse/FLINK-5507


Nico

On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> Hi Nico,
>
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
>
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a7
> 0a6fe2219 The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
>
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
>
> Regards
> Dawid Wysakowicz
>
> 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>:
> > Hi Dawid,
> > I'll try to reproduce the error in the next couple of days. Can you also
> > share
> > the value deserializer you use? Also, have you tried even smaller examples
> > in
> > the meantime? Did they work?
> >
> > As a side-note in general regarding the queryable state "sink" using
> > ListState
> > (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters
> > this operator will be stored forever and never cleaned. Eventually, it
> > will
> > pile up too much memory and is thus of limited use. Maybe it should even
> > be
> > removed from the API.
> >
> >
> > Nico
> >
> > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > Hey Ufuk.
> > > Did you maybe had a while to have a look at that problem?
> > >
> > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
> > > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > > the course of the day. From a first impression, this seems like a bug
> > > > to me.
> > > >
> > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > >
> > > > <[hidden email]> wrote:
> > > > > Hi I was experimenting with the Query State feature and I have some
> > > >
> > > > problems
> > > >
> > > > > querying the state.
> > > > >
> > > > > The code which I use to produce the queryable state is:
> > > > >     env.addSource(kafkaConsumer).map(
> > > > >
> > > > >       e => e match {
> > > > >
> > > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > >
> > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > >       e2._3)))
> > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> >
> > e._2))
> >
> > > > >       .keyBy("key")
> > > > >       .asQueryableState(
> > > > >
> > > > >         "type-time-series-count",
> > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > > >
> > > > >           "type-time-series-count",
> > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > >
> > > > > As you see it is a rather simple job, in which I try to count events
> >
> > of
> >
> > > > > different types in windows and then query by event type.
> > > > >
> > > > > In client code I do:
> > > > >     // Query Flink state
> > > > >     val future = client.getKvState(jobId, "type-time-series-count",
> > > > >
> > > > > key.hashCode, seralizedKey)
> > > > >
> > > > >     // Await async result
> > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > >
> > > > >       future, new FiniteDuration(
> > > > >
> > > > >         10,
> > > > >         duration.SECONDS))
> > > > >
> > > > >     // Deserialize response
> > > > >     val results = deserializeResponse(serializedResult)
> > > > >
> > > > >     results
> > > > >
> > > > >   }
> > > > >
> > > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > > util.List[KeyedDataPoint[lang
> > > > >
> > > > >   .Integer]] = {
> > > > >
> > > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > > >
> > > > > getValueSerializer())
> > > > >
> > > > >   }
> > > > >
> > > > > As I was trying to debug the issue I see the first element in list
> >
> > gets
> >
> > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > like
> > > > > the
> > > > > serialized result is broken. Do you have any idea if I am doing sth
> > > >
> > > > wrong or
> > > >
> > > > > there is some bug?
> > > > >
> > > > >
> > > > > The exception I get is:
> > > > > java.io.EOFException: null
> > > > > at
> > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > >
> > > > DataInputDeserializer.java:157)
> > > >
> > > > > at
> > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > >
> > > > DataInputDeserializer.java:240)
> > > >
> > > > > at
> > > > > org.apache.flink.api.java.typeutils.runtime.
> >
> > PojoSerializer.deserialize(
> >
> > > > PojoSerializer.java:386)
> > > >
> > > > > at
> > > > > org.apache.flink.runtime.query.netty.message.
> >
> > KvStateRequestSerializer.
> >
> > > > deserializeList(KvStateRequestSerializer.java:487)
> > > >
> > > > > at
> > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > >
> > > > deserializeResponse(QueryClient.scala:44)
> > > >
> > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > >
> > > > flink-intro
> > > >
> > > > > I would be grateful for any advice.
> > > > >
> > > > > Regards
> > > > > Dawid Wysakowicz

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Nico Kruber
Hi Dawid,
sorry for the late reply, I was fixing some issues for queryable state and may
now have gotten to the point of your error: you may be seeing a race condition
with the MemoryStateBackend state backend (the default) as described here:
https://issues.apache.org/jira/browse/FLINK-5642
I'm currently working on a fix.

KvStateRequestSerializer#deserializeList(), however, is the right function to
de-serialise list state! - KvStateRequestSerializer#deserializeValue() will
not work!

Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was
indeed not used since the list state backends had their own serialisation
function.
We removed KvStateRequestSerializer#serializeList as well as the queryable
list state sink for 1.2 and up.


Nico

On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:

> Hi Nico, Ufuk,
>
> Thanks for diving into this issue.
>
> @Nico
>
> I don't think that's the problem. The code can be exactly reproduced in
> java. I am using other constructor for ListDescriptor than you did:
>
> You used:
> > public ListStateDescriptor(String name, TypeInformation<T> typeInfo)
>
> While I used:
> >  public ListStateDescriptor(String name, Class<T> typeClass)
>
> I think the problem is with the way I deserialized the value on the
> QueryClient side as I tried to use:
>
>
>
> KvStateRequestSerializer.deserializeList(serializedResult, {
>
>       TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})
>
>         .createSerializer(new ExecutionConfig)
>
>     })
>
> I have not checked it, but now I suspect this code would work:
> > KvStateRequestSerializer.deserializeValue(serializedResult, {
> >
> >       TypeInformation.of(new
> >
> > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> >
> >         .createSerializer(new ExecutionConfig)
> >    
> >     })
>
> Regarding removing the queryable state list I agree, using it seems
> pointless. Moreover while removing it I would take a second look at those
>
> functions:
> > KvStateRequestSerializer::deserializeList
>
>  KvStateRequestSerializer.serializeList
>
>
> As I think they are not used at all even right now. Thanks for your time.
>
> Regards
> Dawid Wysakowicz
>
> 2017-01-16 13:25 GMT+01:00 Nico Kruber <[hidden email]>:
> > Hi Dawid,
> > regarding the original code, I couldn't reproduce this with the Java code
> > I
> > wrote and my guess is that the second parameter of the ListStateDescriptor
> > is
> >
> > wrong:
> >       .asQueryableState(
> >      
> >         "type-time-series-count",
> >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> >        
> >           "type-time-series-count",
> >           classOf[KeyedDataPoint[java.lang.Integer]]))
> >
> > this should rather be
> >
> > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> >
> > as in the query itself. It sounds strange to me that you don't get ant
> > ClassCastException or a compile-time error due to the type being wrong but
> > I
> > lack some Scala knowledge to get to the ground of this.
> >
> >
> > Regarding the removal of the queryable list state "sink", I created a JIRA
> > issue for it and will open a PR:
> > https://issues.apache.org/jira/browse/FLINK-5507
> >
> >
> > Nico
> >
> > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > Hi Nico,
> > >
> > > Recently I've tried the queryable state a bit differently, by using
> > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > util.ArrayList and it works as expected.
> > >
> > > The non-working example you can browse here:
> > > https://github.com/dawidwys/flink-intro/tree/
> >
> > c66f01117b0fe3c0adc8923000543a7
> >
> > > 0a6fe2219 The working example here:
> > > https://github.com/dawidwys/flink-intro/tree/master
> > > (The QueryableJob is in module flink-queryable-job and the QueryClient
> > > in
> > > flink-state-server)
> > >
> > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > presentational purpose, but you may be right there might not be any
> > > production use for this state and it should be removed.
> > > Maybe the problem is just with the ListState and removing it would
> >
> > resolve
> >
> > > also my problem :)
> > >
> > > Regards
> > > Dawid Wysakowicz
> > >
> > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>:
> > > > Hi Dawid,
> > > > I'll try to reproduce the error in the next couple of days. Can you
> >
> > also
> >
> > > > share
> > > > the value deserializer you use? Also, have you tried even smaller
> >
> > examples
> >
> > > > in
> > > > the meantime? Did they work?
> > > >
> > > > As a side-note in general regarding the queryable state "sink" using
> > > > ListState
> > > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that
> >
> > enters
> >
> > > > this operator will be stored forever and never cleaned. Eventually, it
> > > > will
> > > > pile up too much memory and is thus of limited use. Maybe it should
> >
> > even
> >
> > > > be
> > > > removed from the API.
> > > >
> > > >
> > > > Nico
> > > >
> > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > > Hey Ufuk.
> > > > > Did you maybe had a while to have a look at that problem?
> > > > >
> > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
> > > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> >
> > over
> >
> > > > > > the course of the day. From a first impression, this seems like a
> >
> > bug
> >
> > > > > > to me.
> > > > > >
> > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > > >
> > > > > > <[hidden email]> wrote:
> > > > > > > Hi I was experimenting with the Query State feature and I have
> >
> > some
> >
> > > > > > problems
> > > > > >
> > > > > > > querying the state.
> > > > > > >
> > > > > > > The code which I use to produce the queryable state is:
> > > > > > >     env.addSource(kafkaConsumer).map(
> > > > > > >    
> > > > > > >       e => e match {
> > > > > > >      
> > > > > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > > > >      
> > > > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > > > >       e2._3)))
> > > > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1,
> >
> > e._3,
> >
> > > > e._2))
> > > >
> > > > > > >       .keyBy("key")
> > > > > > >       .asQueryableState(
> > > > > > >      
> > > > > > >         "type-time-series-count",
> > > > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.
> >
> > Integer]](
> >
> > > > > > >           "type-time-series-count",
> > > > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > > >
> > > > > > > As you see it is a rather simple job, in which I try to count
> >
> > events
> >
> > > > of
> > > >
> > > > > > > different types in windows and then query by event type.
> > > > > > >
> > > > > > > In client code I do:
> > > > > > >     // Query Flink state
> > > > > > >     val future = client.getKvState(jobId,
> >
> > "type-time-series-count",
> >
> > > > > > > key.hashCode, seralizedKey)
> > > > > > >
> > > > > > >     // Await async result
> > > > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > > > >    
> > > > > > >       future, new FiniteDuration(
> > > > > > >      
> > > > > > >         10,
> > > > > > >         duration.SECONDS))
> > > > > > >    
> > > > > > >     // Deserialize response
> > > > > > >     val results = deserializeResponse(serializedResult)
> > > > > > >    
> > > > > > >     results
> > > > > > >  
> > > > > > >   }
> >
> > > > > > >   private def deserializeResponse(serializedResult:
> > Array[Byte]):
> > > > > > > util.List[KeyedDataPoint[lang
> > > > > > >
> > > > > > >   .Integer]] = {
> > > > > > >  
> > > > > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > > > > >
> > > > > > > getValueSerializer())
> > > > > > >
> > > > > > >   }
> > > > > > >
> > > > > > > As I was trying to debug the issue I see the first element in
> >
> > list
> >
> > > > gets
> > > >
> > > > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > > > like
> > > > > > > the
> > > > > > > serialized result is broken. Do you have any idea if I am doing
> >
> > sth
> >
> > > > > > wrong or
> > > > > >
> > > > > > > there is some bug?
> > > > > > >
> > > > > > >
> > > > > > > The exception I get is:
> > > > > > > java.io.EOFException: null
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > > > >
> > > > > > DataInputDeserializer.java:157)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > > > >
> > > > > > DataInputDeserializer.java:240)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.api.java.typeutils.runtime.
> > > >
> > > > PojoSerializer.deserialize(
> > > >
> > > > > > PojoSerializer.java:386)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.runtime.query.netty.message.
> > > >
> > > > KvStateRequestSerializer.
> > > >
> > > > > > deserializeList(KvStateRequestSerializer.java:487)
> > > > > >
> > > > > > > at
> > > > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > > > >
> > > > > > deserializeResponse(QueryClient.scala:44)
> > > > > >
> > > > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > > > >
> > > > > > flink-intro
> > > > > >
> > > > > > > I would be grateful for any advice.
> > > > > > >
> > > > > > > Regards
> > > > > > > Dawid Wysakowicz


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Dawid Wysakowicz
Hi Nico,

No problem at all, I've already presented my showcase with ValueStateDescriptor.

Anyway, if I could help you somehow with the Queryablestate let me know. I will be happy to contribute some code.

2017-01-25 14:47 GMT+01:00 Nico Kruber <[hidden email]>:
Hi Dawid,
sorry for the late reply, I was fixing some issues for queryable state and may
now have gotten to the point of your error: you may be seeing a race condition
with the MemoryStateBackend state backend (the default) as described here:
https://issues.apache.org/jira/browse/FLINK-5642
I'm currently working on a fix.

KvStateRequestSerializer#deserializeList(), however, is the right function to
de-serialise list state! - KvStateRequestSerializer#deserializeValue() will
not work!

Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was
indeed not used since the list state backends had their own serialisation
function.
We removed KvStateRequestSerializer#serializeList as well as the queryable
list state sink for 1.2 and up.


Nico

On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> Hi Nico, Ufuk,
>
> Thanks for diving into this issue.
>
> @Nico
>
> I don't think that's the problem. The code can be exactly reproduced in
> java. I am using other constructor for ListDescriptor than you did:
>
> You used:
> > public ListStateDescriptor(String name, TypeInformation<T> typeInfo)
>
> While I used:
> >  public ListStateDescriptor(String name, Class<T> typeClass)
>
> I think the problem is with the way I deserialized the value on the
> QueryClient side as I tried to use:
>
>
>
> KvStateRequestSerializer.deserializeList(serializedResult, {
>
>       TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})
>
>         .createSerializer(new ExecutionConfig)
>
>     })
>
> I have not checked it, but now I suspect this code would work:
> > KvStateRequestSerializer.deserializeValue(serializedResult, {
> >
> >       TypeInformation.of(new
> >
> > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> >
> >         .createSerializer(new ExecutionConfig)
> >
> >     })
>
> Regarding removing the queryable state list I agree, using it seems
> pointless. Moreover while removing it I would take a second look at those
>
> functions:
> > KvStateRequestSerializer::deserializeList
>
>  KvStateRequestSerializer.serializeList
>
>
> As I think they are not used at all even right now. Thanks for your time.
>
> Regards
> Dawid Wysakowicz
>
> 2017-01-16 13:25 GMT+01:00 Nico Kruber <[hidden email]>:
> > Hi Dawid,
> > regarding the original code, I couldn't reproduce this with the Java code
> > I
> > wrote and my guess is that the second parameter of the ListStateDescriptor
> > is
> >
> > wrong:
> >       .asQueryableState(
> >
> >         "type-time-series-count",
> >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> >
> >           "type-time-series-count",
> >           classOf[KeyedDataPoint[java.lang.Integer]]))
> >
> > this should rather be
> >
> > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> >
> > as in the query itself. It sounds strange to me that you don't get ant
> > ClassCastException or a compile-time error due to the type being wrong but
> > I
> > lack some Scala knowledge to get to the ground of this.
> >
> >
> > Regarding the removal of the queryable list state "sink", I created a JIRA
> > issue for it and will open a PR:
> > https://issues.apache.org/jira/browse/FLINK-5507
> >
> >
> > Nico
> >
> > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > Hi Nico,
> > >
> > > Recently I've tried the queryable state a bit differently, by using
> > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > util.ArrayList and it works as expected.
> > >
> > > The non-working example you can browse here:
> > > https://github.com/dawidwys/flink-intro/tree/
> >
> > c66f01117b0fe3c0adc8923000543a7
> >
> > > 0a6fe2219 The working example here:
> > > https://github.com/dawidwys/flink-intro/tree/master
> > > (The QueryableJob is in module flink-queryable-job and the QueryClient
> > > in
> > > flink-state-server)
> > >
> > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > presentational purpose, but you may be right there might not be any
> > > production use for this state and it should be removed.
> > > Maybe the problem is just with the ListState and removing it would
> >
> > resolve
> >
> > > also my problem :)
> > >
> > > Regards
> > > Dawid Wysakowicz
> > >
> > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>:
> > > > Hi Dawid,
> > > > I'll try to reproduce the error in the next couple of days. Can you
> >
> > also
> >
> > > > share
> > > > the value deserializer you use? Also, have you tried even smaller
> >
> > examples
> >
> > > > in
> > > > the meantime? Did they work?
> > > >
> > > > As a side-note in general regarding the queryable state "sink" using
> > > > ListState
> > > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that
> >
> > enters
> >
> > > > this operator will be stored forever and never cleaned. Eventually, it
> > > > will
> > > > pile up too much memory and is thus of limited use. Maybe it should
> >
> > even
> >
> > > > be
> > > > removed from the API.
> > > >
> > > >
> > > > Nico
> > > >
> > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > > Hey Ufuk.
> > > > > Did you maybe had a while to have a look at that problem?
> > > > >
> > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>:
> > > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> >
> > over
> >
> > > > > > the course of the day. From a first impression, this seems like a
> >
> > bug
> >
> > > > > > to me.
> > > > > >
> > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > > >
> > > > > > <[hidden email]> wrote:
> > > > > > > Hi I was experimenting with the Query State feature and I have
> >
> > some
> >
> > > > > > problems
> > > > > >
> > > > > > > querying the state.
> > > > > > >
> > > > > > > The code which I use to produce the queryable state is:
> > > > > > >     env.addSource(kafkaConsumer).map(
> > > > > > >
> > > > > > >       e => e match {
> > > > > > >
> > > > > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > > > >
> > > > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > > > >       e2._3)))
> > > > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1,
> >
> > e._3,
> >
> > > > e._2))
> > > >
> > > > > > >       .keyBy("key")
> > > > > > >       .asQueryableState(
> > > > > > >
> > > > > > >         "type-time-series-count",
> > > > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.
> >
> > Integer]](
> >
> > > > > > >           "type-time-series-count",
> > > > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > > >
> > > > > > > As you see it is a rather simple job, in which I try to count
> >
> > events
> >
> > > > of
> > > >
> > > > > > > different types in windows and then query by event type.
> > > > > > >
> > > > > > > In client code I do:
> > > > > > >     // Query Flink state
> > > > > > >     val future = client.getKvState(jobId,
> >
> > "type-time-series-count",
> >
> > > > > > > key.hashCode, seralizedKey)
> > > > > > >
> > > > > > >     // Await async result
> > > > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > > > >
> > > > > > >       future, new FiniteDuration(
> > > > > > >
> > > > > > >         10,
> > > > > > >         duration.SECONDS))
> > > > > > >
> > > > > > >     // Deserialize response
> > > > > > >     val results = deserializeResponse(serializedResult)
> > > > > > >
> > > > > > >     results
> > > > > > >
> > > > > > >   }
> >
> > > > > > >   private def deserializeResponse(serializedResult:
> > Array[Byte]):
> > > > > > > util.List[KeyedDataPoint[lang
> > > > > > >
> > > > > > >   .Integer]] = {
> > > > > > >
> > > > > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > > > > >
> > > > > > > getValueSerializer())
> > > > > > >
> > > > > > >   }
> > > > > > >
> > > > > > > As I was trying to debug the issue I see the first element in
> >
> > list
> >
> > > > gets
> > > >
> > > > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > > > like
> > > > > > > the
> > > > > > > serialized result is broken. Do you have any idea if I am doing
> >
> > sth
> >
> > > > > > wrong or
> > > > > >
> > > > > > > there is some bug?
> > > > > > >
> > > > > > >
> > > > > > > The exception I get is:
> > > > > > > java.io.EOFException: null
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > > > >
> > > > > > DataInputDeserializer.java:157)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > > > >
> > > > > > DataInputDeserializer.java:240)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.api.java.typeutils.runtime.
> > > >
> > > > PojoSerializer.deserialize(
> > > >
> > > > > > PojoSerializer.java:386)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.runtime.query.netty.message.
> > > >
> > > > KvStateRequestSerializer.
> > > >
> > > > > > deserializeList(KvStateRequestSerializer.java:487)
> > > > > >
> > > > > > > at
> > > > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > > > >
> > > > > > deserializeResponse(QueryClient.scala:44)
> > > > > >
> > > > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > > > >
> > > > > > flink-intro
> > > > > >
> > > > > > > I would be grateful for any advice.
> > > > > > >
> > > > > > > Regards
> > > > > > > Dawid Wysakowicz