Hi I was experimenting with the Query State feature and I have some problems querying the state.
The code which I use to produce the queryable state is: env.addSource(kafkaConsumer).map( e => e match { case LoginClickEvent(_, t) => ("login", 1, t) case LogoutClickEvent(_, t) => ("logout", 1, t) case ButtonClickEvent(_, _, t) => ("button", 1, t) }).keyBy(0).timeWindow(Time.seconds(1)) .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3))) .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2)) .keyBy("key") .asQueryableState( "type-time-series-count", new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( "type-time-series-count", classOf[KeyedDataPoint[java.lang.Integer]])) As you see it is a rather simple job, in which I try to count events of different types in windows and then query by event type. In client code I do: // Query Flink state val future = client.getKvState(jobId, "type-time-series-count", key.hashCode, seralizedKey) // Await async result val serializedResult: Array[Byte] = Await.result( future, new FiniteDuration( 10, duration.SECONDS)) // Deserialize response val results = deserializeResponse(serializedResult) results } private def deserializeResponse(serializedResult: Array[Byte]): util.List[KeyedDataPoint[lang .Integer]] = { KvStateRequestSerializer.deserializeList(serializedResult, getValueSerializer()) } As I was trying to debug the issue I see the first element in list gets deserialized correctly, but it fails on the second one. It seems like the serialized result is broken. Do you have any idea if I am doing sth wrong or there is some bug? The exception I get is: java.io.EOFException: null at org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157) at org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386) at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487) at com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44) You can browse the exact code at: https://github.com/dawidwys/flink-intro I would be grateful for any advice. Regards Dawid Wysakowicz |
Hey Dawid! Thanks for reporting this. I will try to have a look over
the course of the day. From a first impression, this seems like a bug to me. On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz <[hidden email]> wrote: > Hi I was experimenting with the Query State feature and I have some problems > querying the state. > > The code which I use to produce the queryable state is: > > env.addSource(kafkaConsumer).map( > e => e match { > case LoginClickEvent(_, t) => ("login", 1, t) > case LogoutClickEvent(_, t) => ("logout", 1, t) > case ButtonClickEvent(_, _, t) => ("button", 1, t) > }).keyBy(0).timeWindow(Time.seconds(1)) > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3))) > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2)) > .keyBy("key") > .asQueryableState( > "type-time-series-count", > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > "type-time-series-count", > classOf[KeyedDataPoint[java.lang.Integer]])) > > As you see it is a rather simple job, in which I try to count events of > different types in windows and then query by event type. > > In client code I do: > // Query Flink state > val future = client.getKvState(jobId, "type-time-series-count", > key.hashCode, seralizedKey) > > // Await async result > val serializedResult: Array[Byte] = Await.result( > future, new FiniteDuration( > 10, > duration.SECONDS)) > > // Deserialize response > val results = deserializeResponse(serializedResult) > > results > } > > private def deserializeResponse(serializedResult: Array[Byte]): > util.List[KeyedDataPoint[lang > .Integer]] = { > KvStateRequestSerializer.deserializeList(serializedResult, > getValueSerializer()) > } > > As I was trying to debug the issue I see the first element in list gets > deserialized correctly, but it fails on the second one. It seems like the > serialized result is broken. Do you have any idea if I am doing sth wrong or > there is some bug? > > > The exception I get is: > java.io.EOFException: null > at > org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157) > at > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386) > at > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487) > at > com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44) > > You can browse the exact code at: https://github.com/dawidwys/flink-intro > > I would be grateful for any advice. > > Regards > Dawid Wysakowicz |
Hey Ufuk. Did you maybe had a while to have a look at that problem? 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>: Hey Dawid! Thanks for reporting this. I will try to have a look over |
Hi Dawid,
I'll try to reproduce the error in the next couple of days. Can you also share the value deserializer you use? Also, have you tried even smaller examples in the meantime? Did they work? As a side-note in general regarding the queryable state "sink" using ListState (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters this operator will be stored forever and never cleaned. Eventually, it will pile up too much memory and is thus of limited use. Maybe it should even be removed from the API. Nico On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote: > Hey Ufuk. > Did you maybe had a while to have a look at that problem? > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>: > > Hey Dawid! Thanks for reporting this. I will try to have a look over > > the course of the day. From a first impression, this seems like a bug > > to me. > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz > > > > <[hidden email]> wrote: > > > Hi I was experimenting with the Query State feature and I have some > > > > problems > > > > > querying the state. > > > > > > The code which I use to produce the queryable state is: > > > env.addSource(kafkaConsumer).map( > > > > > > e => e match { > > > > > > case LoginClickEvent(_, t) => ("login", 1, t) > > > case LogoutClickEvent(_, t) => ("logout", 1, t) > > > case ButtonClickEvent(_, _, t) => ("button", 1, t) > > > > > > }).keyBy(0).timeWindow(Time.seconds(1)) > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, > > > e2._3))) > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2)) > > > .keyBy("key") > > > .asQueryableState( > > > > > > "type-time-series-count", > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > > > > > > "type-time-series-count", > > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > > > As you see it is a rather simple job, in which I try to count events of > > > different types in windows and then query by event type. > > > > > > In client code I do: > > > // Query Flink state > > > val future = client.getKvState(jobId, "type-time-series-count", > > > > > > key.hashCode, seralizedKey) > > > > > > // Await async result > > > val serializedResult: Array[Byte] = Await.result( > > > > > > future, new FiniteDuration( > > > > > > 10, > > > duration.SECONDS)) > > > > > > // Deserialize response > > > val results = deserializeResponse(serializedResult) > > > > > > results > > > > > > } > > > > > > private def deserializeResponse(serializedResult: Array[Byte]): > > > util.List[KeyedDataPoint[lang > > > > > > .Integer]] = { > > > > > > KvStateRequestSerializer.deserializeList(serializedResult, > > > > > > getValueSerializer()) > > > > > > } > > > > > > As I was trying to debug the issue I see the first element in list gets > > > deserialized correctly, but it fails on the second one. It seems like > > > the > > > serialized result is broken. Do you have any idea if I am doing sth > > > > wrong or > > > > > there is some bug? > > > > > > > > > The exception I get is: > > > java.io.EOFException: null > > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( > > > > DataInputDeserializer.java:157) > > > > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( > > > > DataInputDeserializer.java:240) > > > > > at > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize( > > > > PojoSerializer.java:386) > > > > > at > > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer. > > > > deserializeList(KvStateRequestSerializer.java:487) > > > > > at > > > com.dataartisans.stateserver.queryclient.QueryClient. > > > > deserializeResponse(QueryClient.scala:44) > > > > > You can browse the exact code at: https://github.com/dawidwys/ > > > > flink-intro > > > > > I would be grateful for any advice. > > > > > > Regards > > > Dawid Wysakowicz signature.asc (201 bytes) Download Attachment |
Hi Nico, Recently I've tried the queryable state a bit differently, by using ValueState with a value of a util.ArrayList and a ValueSerializer for util.ArrayList and it works as expected. The non-working example you can browse here: https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219 The working example here: https://github.com/dawidwys/flink-intro/tree/master (The QueryableJob is in module flink-queryable-job and the QueryClient in flink-state-server) Sure, I am aware of the downfall of the ListState. I need it just for presentational purpose, but you may be right there might not be any production use for this state and it should be removed. Maybe the problem is just with the ListState and removing it would resolve also my problem :) Regards Dawid Wysakowicz 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>: Hi Dawid, |
Hey Dawid! I talked offline with Nico last week and he took this over.
He also suggested to remove the list queryable state variant altogether which makes a lot of sense to me (at least with the current state of things). @Nico: could you open an issue for it? Nico also found a difference in your code in the way you get the type information in the job vs. your code (class of vs. manual type information), which could be the root of the initial problem. – Ufuk On Sat, Jan 14, 2017 at 2:03 PM, Dawid Wysakowicz <[hidden email]> wrote: > Hi Nico, > > Recently I've tried the queryable state a bit differently, by using > ValueState with a value of a util.ArrayList and a ValueSerializer for > util.ArrayList and it works as expected. > > The non-working example you can browse here: > https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219 > The working example here: > https://github.com/dawidwys/flink-intro/tree/master > (The QueryableJob is in module flink-queryable-job and the QueryClient in > flink-state-server) > > Sure, I am aware of the downfall of the ListState. I need it just for > presentational purpose, but you may be right there might not be any > production use for this state and it should be removed. > Maybe the problem is just with the ListState and removing it would resolve > also my problem :) > > Regards > Dawid Wysakowicz > > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>: >> >> Hi Dawid, >> I'll try to reproduce the error in the next couple of days. Can you also >> share >> the value deserializer you use? Also, have you tried even smaller examples >> in >> the meantime? Did they work? >> >> As a side-note in general regarding the queryable state "sink" using >> ListState >> (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters >> this operator will be stored forever and never cleaned. Eventually, it >> will >> pile up too much memory and is thus of limited use. Maybe it should even >> be >> removed from the API. >> >> >> Nico >> >> On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote: >> > Hey Ufuk. >> > Did you maybe had a while to have a look at that problem? >> > >> > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>: >> > > Hey Dawid! Thanks for reporting this. I will try to have a look over >> > > the course of the day. From a first impression, this seems like a bug >> > > to me. >> > > >> > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz >> > > >> > > <[hidden email]> wrote: >> > > > Hi I was experimenting with the Query State feature and I have some >> > > >> > > problems >> > > >> > > > querying the state. >> > > > >> > > > The code which I use to produce the queryable state is: >> > > > env.addSource(kafkaConsumer).map( >> > > > >> > > > e => e match { >> > > > >> > > > case LoginClickEvent(_, t) => ("login", 1, t) >> > > > case LogoutClickEvent(_, t) => ("logout", 1, t) >> > > > case ButtonClickEvent(_, _, t) => ("button", 1, t) >> > > > >> > > > }).keyBy(0).timeWindow(Time.seconds(1)) >> > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, >> > > > e2._3))) >> > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, >> > > > e._2)) >> > > > .keyBy("key") >> > > > .asQueryableState( >> > > > >> > > > "type-time-series-count", >> > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( >> > > > >> > > > "type-time-series-count", >> > > > classOf[KeyedDataPoint[java.lang.Integer]])) >> > > > >> > > > As you see it is a rather simple job, in which I try to count events >> > > > of >> > > > different types in windows and then query by event type. >> > > > >> > > > In client code I do: >> > > > // Query Flink state >> > > > val future = client.getKvState(jobId, "type-time-series-count", >> > > > >> > > > key.hashCode, seralizedKey) >> > > > >> > > > // Await async result >> > > > val serializedResult: Array[Byte] = Await.result( >> > > > >> > > > future, new FiniteDuration( >> > > > >> > > > 10, >> > > > duration.SECONDS)) >> > > > >> > > > // Deserialize response >> > > > val results = deserializeResponse(serializedResult) >> > > > >> > > > results >> > > > >> > > > } >> > > > >> > > > private def deserializeResponse(serializedResult: Array[Byte]): >> > > > util.List[KeyedDataPoint[lang >> > > > >> > > > .Integer]] = { >> > > > >> > > > KvStateRequestSerializer.deserializeList(serializedResult, >> > > > >> > > > getValueSerializer()) >> > > > >> > > > } >> > > > >> > > > As I was trying to debug the issue I see the first element in list >> > > > gets >> > > > deserialized correctly, but it fails on the second one. It seems >> > > > like >> > > > the >> > > > serialized result is broken. Do you have any idea if I am doing sth >> > > >> > > wrong or >> > > >> > > > there is some bug? >> > > > >> > > > >> > > > The exception I get is: >> > > > java.io.EOFException: null >> > > > at >> > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( >> > > >> > > DataInputDeserializer.java:157) >> > > >> > > > at >> > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( >> > > >> > > DataInputDeserializer.java:240) >> > > >> > > > at >> > > > >> > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize( >> > > >> > > PojoSerializer.java:386) >> > > >> > > > at >> > > > >> > > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer. >> > > >> > > deserializeList(KvStateRequestSerializer.java:487) >> > > >> > > > at >> > > > com.dataartisans.stateserver.queryclient.QueryClient. >> > > >> > > deserializeResponse(QueryClient.scala:44) >> > > >> > > > You can browse the exact code at: https://github.com/dawidwys/ >> > > >> > > flink-intro >> > > >> > > > I would be grateful for any advice. >> > > > >> > > > Regards >> > > > Dawid Wysakowicz > > |
In reply to this post by Dawid Wysakowicz
Hi Dawid,
regarding the original code, I couldn't reproduce this with the Java code I wrote and my guess is that the second parameter of the ListStateDescriptor is wrong: .asQueryableState( "type-time-series-count", new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( "type-time-series-count", classOf[KeyedDataPoint[java.lang.Integer]])) this should rather be TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {} as in the query itself. It sounds strange to me that you don't get ant ClassCastException or a compile-time error due to the type being wrong but I lack some Scala knowledge to get to the ground of this. Regarding the removal of the queryable list state "sink", I created a JIRA issue for it and will open a PR: https://issues.apache.org/jira/browse/FLINK-5507 Nico On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote: > Hi Nico, > > Recently I've tried the queryable state a bit differently, by using > ValueState with a value of a util.ArrayList and a ValueSerializer for > util.ArrayList and it works as expected. > > The non-working example you can browse here: > https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a7 > 0a6fe2219 The working example here: > https://github.com/dawidwys/flink-intro/tree/master > (The QueryableJob is in module flink-queryable-job and the QueryClient in > flink-state-server) > > Sure, I am aware of the downfall of the ListState. I need it just for > presentational purpose, but you may be right there might not be any > production use for this state and it should be removed. > Maybe the problem is just with the ListState and removing it would resolve > also my problem :) > > Regards > Dawid Wysakowicz > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>: > > Hi Dawid, > > I'll try to reproduce the error in the next couple of days. Can you also > > share > > the value deserializer you use? Also, have you tried even smaller examples > > in > > the meantime? Did they work? > > > > As a side-note in general regarding the queryable state "sink" using > > ListState > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters > > this operator will be stored forever and never cleaned. Eventually, it > > will > > pile up too much memory and is thus of limited use. Maybe it should even > > be > > removed from the API. > > > > > > Nico > > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote: > > > Hey Ufuk. > > > Did you maybe had a while to have a look at that problem? > > > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>: > > > > Hey Dawid! Thanks for reporting this. I will try to have a look over > > > > the course of the day. From a first impression, this seems like a bug > > > > to me. > > > > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz > > > > > > > > <[hidden email]> wrote: > > > > > Hi I was experimenting with the Query State feature and I have some > > > > > > > > problems > > > > > > > > > querying the state. > > > > > > > > > > The code which I use to produce the queryable state is: > > > > > env.addSource(kafkaConsumer).map( > > > > > > > > > > e => e match { > > > > > > > > > > case LoginClickEvent(_, t) => ("login", 1, t) > > > > > case LogoutClickEvent(_, t) => ("logout", 1, t) > > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t) > > > > > > > > > > }).keyBy(0).timeWindow(Time.seconds(1)) > > > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, > > > > > e2._3))) > > > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, > > > > e._2)) > > > > > > > .keyBy("key") > > > > > .asQueryableState( > > > > > > > > > > "type-time-series-count", > > > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > > > > > > > > > > "type-time-series-count", > > > > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > > > > > > > As you see it is a rather simple job, in which I try to count events > > > > of > > > > > > > different types in windows and then query by event type. > > > > > > > > > > In client code I do: > > > > > // Query Flink state > > > > > val future = client.getKvState(jobId, "type-time-series-count", > > > > > > > > > > key.hashCode, seralizedKey) > > > > > > > > > > // Await async result > > > > > val serializedResult: Array[Byte] = Await.result( > > > > > > > > > > future, new FiniteDuration( > > > > > > > > > > 10, > > > > > duration.SECONDS)) > > > > > > > > > > // Deserialize response > > > > > val results = deserializeResponse(serializedResult) > > > > > > > > > > results > > > > > > > > > > } > > > > > > > > > > private def deserializeResponse(serializedResult: Array[Byte]): > > > > > util.List[KeyedDataPoint[lang > > > > > > > > > > .Integer]] = { > > > > > > > > > > KvStateRequestSerializer.deserializeList(serializedResult, > > > > > > > > > > getValueSerializer()) > > > > > > > > > > } > > > > > > > > > > As I was trying to debug the issue I see the first element in list > > > > gets > > > > > > > deserialized correctly, but it fails on the second one. It seems > > > > > like > > > > > the > > > > > serialized result is broken. Do you have any idea if I am doing sth > > > > > > > > wrong or > > > > > > > > > there is some bug? > > > > > > > > > > > > > > > The exception I get is: > > > > > java.io.EOFException: null > > > > > at > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( > > > > > > > > DataInputDeserializer.java:157) > > > > > > > > > at > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( > > > > > > > > DataInputDeserializer.java:240) > > > > > > > > > at > > > > > org.apache.flink.api.java.typeutils.runtime. > > > > PojoSerializer.deserialize( > > > > > > PojoSerializer.java:386) > > > > > > > > > at > > > > > org.apache.flink.runtime.query.netty.message. > > > > KvStateRequestSerializer. > > > > > > deserializeList(KvStateRequestSerializer.java:487) > > > > > > > > > at > > > > > com.dataartisans.stateserver.queryclient.QueryClient. > > > > > > > > deserializeResponse(QueryClient.scala:44) > > > > > > > > > You can browse the exact code at: https://github.com/dawidwys/ > > > > > > > > flink-intro > > > > > > > > > I would be grateful for any advice. > > > > > > > > > > Regards > > > > > Dawid Wysakowicz signature.asc (201 bytes) Download Attachment |
Hi Nico, Ufuk, Thanks for diving into this issue. @Nico I don't think that's the problem. The code can be exactly reproduced in java. I am using other constructor for ListDescriptor than you did: You used: public ListStateDescriptor(String name, TypeInformation<T> typeInfo) While I used: public ListStateDescriptor(String name, Class<T> typeClass) I think the problem is with the way I deserialized the value on the QueryClient side as I tried to use: KvStateRequestSerializer.deserializeList(serializedResult, { TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}) .createSerializer(new ExecutionConfig) }) I have not checked it, but now I suspect this code would work: KvStateRequestSerializer.deserializeValue(serializedResult, { Regarding removing the queryable state list I agree, using it seems pointless. Moreover while removing it I would take a second look at those functions: KvStateRequestSerializer::deserializeList KvStateRequestSerializer.serializeList As I think they are not used at all even right now. Thanks for your time. Regards Dawid Wysakowicz 2017-01-16 13:25 GMT+01:00 Nico Kruber <[hidden email]>: Hi Dawid, |
Hi Dawid,
sorry for the late reply, I was fixing some issues for queryable state and may now have gotten to the point of your error: you may be seeing a race condition with the MemoryStateBackend state backend (the default) as described here: https://issues.apache.org/jira/browse/FLINK-5642 I'm currently working on a fix. KvStateRequestSerializer#deserializeList(), however, is the right function to de-serialise list state! - KvStateRequestSerializer#deserializeValue() will not work! Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was indeed not used since the list state backends had their own serialisation function. We removed KvStateRequestSerializer#serializeList as well as the queryable list state sink for 1.2 and up. Nico On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote: > Hi Nico, Ufuk, > > Thanks for diving into this issue. > > @Nico > > I don't think that's the problem. The code can be exactly reproduced in > java. I am using other constructor for ListDescriptor than you did: > > You used: > > public ListStateDescriptor(String name, TypeInformation<T> typeInfo) > > While I used: > > public ListStateDescriptor(String name, Class<T> typeClass) > > I think the problem is with the way I deserialized the value on the > QueryClient side as I tried to use: > > > > KvStateRequestSerializer.deserializeList(serializedResult, { > > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}) > > .createSerializer(new ExecutionConfig) > > }) > > I have not checked it, but now I suspect this code would work: > > KvStateRequestSerializer.deserializeValue(serializedResult, { > > > > TypeInformation.of(new > > > > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {}) > > > > .createSerializer(new ExecutionConfig) > > > > }) > > Regarding removing the queryable state list I agree, using it seems > pointless. Moreover while removing it I would take a second look at those > > functions: > > KvStateRequestSerializer::deserializeList > > KvStateRequestSerializer.serializeList > > > As I think they are not used at all even right now. Thanks for your time. > > Regards > Dawid Wysakowicz > > 2017-01-16 13:25 GMT+01:00 Nico Kruber <[hidden email]>: > > Hi Dawid, > > regarding the original code, I couldn't reproduce this with the Java code > > I > > wrote and my guess is that the second parameter of the ListStateDescriptor > > is > > > > wrong: > > .asQueryableState( > > > > "type-time-series-count", > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > > > > "type-time-series-count", > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > this should rather be > > > > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {} > > > > as in the query itself. It sounds strange to me that you don't get ant > > ClassCastException or a compile-time error due to the type being wrong but > > I > > lack some Scala knowledge to get to the ground of this. > > > > > > Regarding the removal of the queryable list state "sink", I created a JIRA > > issue for it and will open a PR: > > https://issues.apache.org/jira/browse/FLINK-5507 > > > > > > Nico > > > > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote: > > > Hi Nico, > > > > > > Recently I've tried the queryable state a bit differently, by using > > > ValueState with a value of a util.ArrayList and a ValueSerializer for > > > util.ArrayList and it works as expected. > > > > > > The non-working example you can browse here: > > > https://github.com/dawidwys/flink-intro/tree/ > > > > c66f01117b0fe3c0adc8923000543a7 > > > > > 0a6fe2219 The working example here: > > > https://github.com/dawidwys/flink-intro/tree/master > > > (The QueryableJob is in module flink-queryable-job and the QueryClient > > > in > > > flink-state-server) > > > > > > Sure, I am aware of the downfall of the ListState. I need it just for > > > presentational purpose, but you may be right there might not be any > > > production use for this state and it should be removed. > > > Maybe the problem is just with the ListState and removing it would > > > > resolve > > > > > also my problem :) > > > > > > Regards > > > Dawid Wysakowicz > > > > > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <[hidden email]>: > > > > Hi Dawid, > > > > I'll try to reproduce the error in the next couple of days. Can you > > > > also > > > > > > share > > > > the value deserializer you use? Also, have you tried even smaller > > > > examples > > > > > > in > > > > the meantime? Did they work? > > > > > > > > As a side-note in general regarding the queryable state "sink" using > > > > ListState > > > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that > > > > enters > > > > > > this operator will be stored forever and never cleaned. Eventually, it > > > > will > > > > pile up too much memory and is thus of limited use. Maybe it should > > > > even > > > > > > be > > > > removed from the API. > > > > > > > > > > > > Nico > > > > > > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote: > > > > > Hey Ufuk. > > > > > Did you maybe had a while to have a look at that problem? > > > > > > > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <[hidden email]>: > > > > > > Hey Dawid! Thanks for reporting this. I will try to have a look > > > > over > > > > > > > > the course of the day. From a first impression, this seems like a > > > > bug > > > > > > > > to me. > > > > > > > > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz > > > > > > > > > > > > <[hidden email]> wrote: > > > > > > > Hi I was experimenting with the Query State feature and I have > > > > some > > > > > > > > problems > > > > > > > > > > > > > querying the state. > > > > > > > > > > > > > > The code which I use to produce the queryable state is: > > > > > > > env.addSource(kafkaConsumer).map( > > > > > > > > > > > > > > e => e match { > > > > > > > > > > > > > > case LoginClickEvent(_, t) => ("login", 1, t) > > > > > > > case LogoutClickEvent(_, t) => ("logout", 1, t) > > > > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t) > > > > > > > > > > > > > > }).keyBy(0).timeWindow(Time.seconds(1)) > > > > > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, > > > > > > > e2._3))) > > > > > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, > > > > e._3, > > > > > > e._2)) > > > > > > > > > > > .keyBy("key") > > > > > > > .asQueryableState( > > > > > > > > > > > > > > "type-time-series-count", > > > > > > > new ListStateDescriptor[KeyedDataPoint[java.lang. > > > > Integer]]( > > > > > > > > > "type-time-series-count", > > > > > > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > > > > > > > > > > > As you see it is a rather simple job, in which I try to count > > > > events > > > > > > of > > > > > > > > > > > different types in windows and then query by event type. > > > > > > > > > > > > > > In client code I do: > > > > > > > // Query Flink state > > > > > > > val future = client.getKvState(jobId, > > > > "type-time-series-count", > > > > > > > > > key.hashCode, seralizedKey) > > > > > > > > > > > > > > // Await async result > > > > > > > val serializedResult: Array[Byte] = Await.result( > > > > > > > > > > > > > > future, new FiniteDuration( > > > > > > > > > > > > > > 10, > > > > > > > duration.SECONDS)) > > > > > > > > > > > > > > // Deserialize response > > > > > > > val results = deserializeResponse(serializedResult) > > > > > > > > > > > > > > results > > > > > > > > > > > > > > } > > > > > > > > > private def deserializeResponse(serializedResult: > > Array[Byte]): > > > > > > > util.List[KeyedDataPoint[lang > > > > > > > > > > > > > > .Integer]] = { > > > > > > > > > > > > > > KvStateRequestSerializer.deserializeList(serializedResult, > > > > > > > > > > > > > > getValueSerializer()) > > > > > > > > > > > > > > } > > > > > > > > > > > > > > As I was trying to debug the issue I see the first element in > > > > list > > > > > > gets > > > > > > > > > > > deserialized correctly, but it fails on the second one. It seems > > > > > > > like > > > > > > > the > > > > > > > serialized result is broken. Do you have any idea if I am doing > > > > sth > > > > > > > > wrong or > > > > > > > > > > > > > there is some bug? > > > > > > > > > > > > > > > > > > > > > The exception I get is: > > > > > > > java.io.EOFException: null > > > > > > > at > > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( > > > > > > > > > > > > DataInputDeserializer.java:157) > > > > > > > > > > > > > at > > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( > > > > > > > > > > > > DataInputDeserializer.java:240) > > > > > > > > > > > > > at > > > > > > > org.apache.flink.api.java.typeutils.runtime. > > > > > > > > PojoSerializer.deserialize( > > > > > > > > > > PojoSerializer.java:386) > > > > > > > > > > > > > at > > > > > > > org.apache.flink.runtime.query.netty.message. > > > > > > > > KvStateRequestSerializer. > > > > > > > > > > deserializeList(KvStateRequestSerializer.java:487) > > > > > > > > > > > > > at > > > > > > > com.dataartisans.stateserver.queryclient.QueryClient. > > > > > > > > > > > > deserializeResponse(QueryClient.scala:44) > > > > > > > > > > > > > You can browse the exact code at: https://github.com/dawidwys/ > > > > > > > > > > > > flink-intro > > > > > > > > > > > > > I would be grateful for any advice. > > > > > > > > > > > > > > Regards > > > > > > > Dawid Wysakowicz signature.asc (201 bytes) Download Attachment |
Hi Nico, No problem at all, I've already presented my showcase with ValueStateDescriptor. Anyway, if I could help you somehow with the Queryablestate let me know. I will be happy to contribute some code. 2017-01-25 14:47 GMT+01:00 Nico Kruber <[hidden email]>: Hi Dawid, |
Free forum by Nabble | Edit this page |