Queryable state when key is UUID - getting Kyro Exception

classic Classic list List threaded Threaded
21 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:

CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

bupt_ljy

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:

CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:

CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Tzu-Li (Gordon) Tai
Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

bupt_ljy
In reply to this post by Jayant Ameta

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

bupt_ljy
In reply to this post by Jayant Ameta

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

bupt_ljy
In reply to this post by Jayant Ameta

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Till Rohrmann
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Till Rohrmann
Could you send us a small example program which we can use to reproduce the problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <[hidden email]> wrote:
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.

public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException {
UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
ExecutionConfig config = new ExecutionConfig();
client.setExecutionConfig(config);

MapStateDescriptor<UUID, Rule> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
Rule.class);
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
uuid, TypeInformation.of(UUID.class), descriptor);

while (!resultFuture.isDone()) {
Thread.sleep(1000);
}
resultFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
} else {
try {
System.out.println(result.get(uuid));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

Below is the stack trace:

Caused by: java.lang.RuntimeException: Error while processing request with ID 12. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <[hidden email]> wrote:
Could you send us a small example program which we can use to reproduce the problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <[hidden email]> wrote:
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

bupt_ljy
In reply to this post by Jayant Ameta

Hi, Jayant

   The key you specified in getKvState function should be the key of the keyed stream instead of the key of the map. From what I’ve seen on https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, this feature only supports managed keyed state.

   By the way, I think we should optimize the error messages with which what Jayant met.


Best,
Jiayi Liao

 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: trohrmann<[hidden email]>
Cc: bupt_ljy<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 13:39
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.

public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException {
UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
ExecutionConfig config = new ExecutionConfig();
client.setExecutionConfig(config);

MapStateDescriptor<UUID, Rule> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
Rule.class);
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
uuid, TypeInformation.of(UUID.class), descriptor);

while (!resultFuture.isDone()) {
Thread.sleep(1000);
}
resultFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
} else {
try {
System.out.println(result.get(uuid));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

Below is the stack trace:

Caused by: java.lang.RuntimeException: Error while processing request with ID 12. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <[hidden email]> wrote:
Could you send us a small example program which we can use to reproduce the problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <[hidden email]> wrote:
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Thanks Jiayi,
I updated the client code to use keyed stream key. The key is a Tuple2<UUID, String>
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHint<Tuple2<UUID, String>>() {
}), descriptor);
I'm now getting a different exception. I'm NOT using Avro as a customer serializer. Not sure what causes this issue.

Caused by: java.lang.RuntimeException: Error while processing request with ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Jayant Ameta


On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

   The key you specified in getKvState function should be the key of the keyed stream instead of the key of the map. From what I’ve seen on https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, this feature only supports managed keyed state.

   By the way, I think we should optimize the error messages with which what Jayant met.


Best,
Jiayi Liao

 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: trohrmann<[hidden email]>
Cc: bupt_ljy<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 13:39
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.

public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException {
UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
ExecutionConfig config = new ExecutionConfig();
client.setExecutionConfig(config);

MapStateDescriptor<UUID, Rule> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
Rule.class);
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
uuid, TypeInformation.of(UUID.class), descriptor);

while (!resultFuture.isDone()) {
Thread.sleep(1000);
}
resultFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
} else {
try {
System.out.println(result.get(uuid));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

Below is the stack trace:

Caused by: java.lang.RuntimeException: Error while processing request with ID 12. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <[hidden email]> wrote:
Could you send us a small example program which we can use to reproduce the problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <[hidden email]> wrote:
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

bupt_ljy
In reply to this post by Jayant Ameta

Hi Jayant,

   I don’t know why flink uses the Avro serializer, which is usually used in POJO class, but from the error messages, I think you can add flink-avro as a dependency and try again.


Best,

Jiayi Liao


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: trohrmann<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 16:15
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Thanks Jiayi,
I updated the client code to use keyed stream key. The key is a Tuple2<UUID, String>
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHint<Tuple2<UUID, String>>() {
}), descriptor);
I'm now getting a different exception. I'm NOT using Avro as a customer serializer. Not sure what causes this issue.

Caused by: java.lang.RuntimeException: Error while processing request with ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Jayant Ameta


On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

   The key you specified in getKvState function should be the key of the keyed stream instead of the key of the map. From what I’ve seen on https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, this feature only supports managed keyed state.

   By the way, I think we should optimize the error messages with which what Jayant met.


Best,
Jiayi Liao

 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: trohrmann<[hidden email]>
Cc: bupt_ljy<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 13:39
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.

public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException {
UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
ExecutionConfig config = new ExecutionConfig();
client.setExecutionConfig(config);

MapStateDescriptor<UUID, Rule> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
Rule.class);
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
uuid, TypeInformation.of(UUID.class), descriptor);

while (!resultFuture.isDone()) {
Thread.sleep(1000);
}
resultFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
} else {
try {
System.out.println(result.get(uuid));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

Below is the stack trace:

Caused by: java.lang.RuntimeException: Error while processing request with ID 12. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <[hidden email]> wrote:
Could you send us a small example program which we can use to reproduce the problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <[hidden email]> wrote:
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Jayant Ameta
Getting the same error even when I added flink-avro dependency to the client.

Jayant Ameta


On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy <[hidden email]> wrote:

Hi Jayant,

   I don’t know why flink uses the Avro serializer, which is usually used in POJO class, but from the error messages, I think you can add flink-avro as a dependency and try again.


Best,

Jiayi Liao


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: trohrmann<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 16:15
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Thanks Jiayi,
I updated the client code to use keyed stream key. The key is a Tuple2<UUID, String>
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHint<Tuple2<UUID, String>>() {
}), descriptor);
I'm now getting a different exception. I'm NOT using Avro as a customer serializer. Not sure what causes this issue.

Caused by: java.lang.RuntimeException: Error while processing request with ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Jayant Ameta


On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

   The key you specified in getKvState function should be the key of the keyed stream instead of the key of the map. From what I’ve seen on https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, this feature only supports managed keyed state.

   By the way, I think we should optimize the error messages with which what Jayant met.


Best,
Jiayi Liao

 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: trohrmann<[hidden email]>
Cc: bupt_ljy<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 13:39
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.

public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException {
UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
ExecutionConfig config = new ExecutionConfig();
client.setExecutionConfig(config);

MapStateDescriptor<UUID, Rule> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
Rule.class);
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
uuid, TypeInformation.of(UUID.class), descriptor);

while (!resultFuture.isDone()) {
Thread.sleep(1000);
}
resultFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
} else {
try {
System.out.println(result.get(uuid));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

Below is the stack trace:

Caused by: java.lang.RuntimeException: Error while processing request with ID 12. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <[hidden email]> wrote:
Could you send us a small example program which we can use to reproduce the problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <[hidden email]> wrote:
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
Reply | Threaded
Open this post in threaded view
|

Re: Queryable state when key is UUID - getting Kyro Exception

Till Rohrmann
Hi Jayant,

could you maybe setup a small Github project with the client and server code? Otherwise it is really hard to reproduce the problem. Thanks a lot!

Cheers,
Till

On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta <[hidden email]> wrote:
Getting the same error even when I added flink-avro dependency to the client.

Jayant Ameta


On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy <[hidden email]> wrote:

Hi Jayant,

   I don’t know why flink uses the Avro serializer, which is usually used in POJO class, but from the error messages, I think you can add flink-avro as a dependency and try again.


Best,

Jiayi Liao


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: trohrmann<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 16:15
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Thanks Jiayi,
I updated the client code to use keyed stream key. The key is a Tuple2<UUID, String>
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHint<Tuple2<UUID, String>>() {
}), descriptor);
I'm now getting a different exception. I'm NOT using Avro as a customer serializer. Not sure what causes this issue.

Caused by: java.lang.RuntimeException: Error while processing request with ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency.
at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Jayant Ameta


On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

   The key you specified in getKvState function should be the key of the keyed stream instead of the key of the map. From what I’ve seen on https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, this feature only supports managed keyed state.

   By the way, I think we should optimize the error messages with which what Jayant met.


Best,
Jiayi Liao

 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: trohrmann<[hidden email]>
Cc: bupt_ljy<[hidden email]>; Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Tuesday, Nov 13, 2018 13:39
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.

public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException {
UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
ExecutionConfig config = new ExecutionConfig();
client.setExecutionConfig(config);

MapStateDescriptor<UUID, Rule> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
Rule.class);
CompletableFuture<MapState<UUID, Rule>> resultFuture =
client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules",
uuid, TypeInformation.of(UUID.class), descriptor);

while (!resultFuture.isDone()) {
Thread.sleep(1000);
}
resultFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
} else {
try {
System.out.println(result.get(uuid));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

Below is the stack trace:

Caused by: java.lang.RuntimeException: Error while processing request with ID 12. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more

at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <[hidden email]> wrote:
Could you send us a small example program which we can use to reproduce the problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <[hidden email]> wrote:
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Jayant, could you check that the UUID key on the TM is actually serialized using a Kryo serializer? You can do this by setting a breakpoint in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <[hidden email]> wrote:

Hi, Jayant

    Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. 

    I’m not very sure about this problem. Maybe you can write a very simple demo to try if it works.


Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Monday, Oct 29, 2018 11:53
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[hidden email]> wrote:
MapStateDescriptor<UUID, String> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);
Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[hidden email]> wrote:

Hi,

   Can you show us the descriptor in the codes below?

    client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",

        UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);

Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: bupt_ljy<[hidden email]>
Cc: Tzu-Li (Gordon) Tai<[hidden email]>; user<[hidden email]>
Date: Friday, Oct 26, 2018 02:26
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Also, I haven't provided any custom serializer in my flink job. Shouldn't the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[hidden email]> wrote:
Hi Gordon,
Following is the stack trace that I'm getting:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[hidden email]> wrote:

Hi  Jayant,

  There should be a Serializer parameter in the constructor of the StateDescriptor, you should create a new serializer like this: 


   new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


 By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best



 Original Message 
Sender: Tzu-Li (Gordon) Tai<[hidden email]>
Recipient: Jayant Ameta<[hidden email]>; bupt_ljy<[hidden email]>
Cc: user<[hidden email]>
Date: Thursday, Oct 25, 2018 17:18
Subject: Re: Queryable state when key is UUID - getting Kyro Exception

Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([hidden email]) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[hidden email]> wrote:

Hi,

   It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? 



Jiayi Liao, Best


 Original Message 
Sender: Jayant Ameta<[hidden email]>
Recipient: user<[hidden email]>
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState<UUID, String>

Client code snippet:


CompletableFuture<MapState<UUID, String>> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta
12