Fail to recover Keyed State afeter ReinterpretAsKeyedStream

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fail to recover Keyed State afeter ReinterpretAsKeyedStream

Jose Cisneros

Hi,


To avoid reshuffling in my job, I started using  DataStreamUtils.reinterpretAsKeyedStream to avoid having to do another keyBy for the same key.  The BackEndState is RocksDB.


When the job recovers after a failure, the ProcessFunction after the keyBy restores its Keyed State correctly, while the Process function after reinterpretAsKeyedStream does not recover the Keyed State.


I have checked the data written by the checkpoints and there is a reference to the sate.


If I change and use keyBy instead of DataStreamUtils.reinterpretAsKeyedStream  the Keyed State is recovered as expected.


Is the DataStreamUtils.reinterpretAsKeyedStream function not intended to use Keyed State? 


Thank you.

Regards,


Jose

Reply | Threaded
Open this post in threaded view
|

Re: Fail to recover Keyed State afeter ReinterpretAsKeyedStream

Tzu-Li (Gordon) Tai
Hi Jose,

As far as I know, you should be able to use keyed state on a stream returned by DataStreamUtils.reinterpretAsKeyedStream function. That shouldn’t be the issue here.

Have you looked into the logs for any meaningful exceptions of why the restore failed?
That would be helpful here to understand whether or not this is a bug.

Cheers,
Gordon


On 24 October 2018 at 9:58:54 PM, Jose Cisneros ([hidden email]) wrote:

Hi,


To avoid reshuffling in my job, I started using  DataStreamUtils.reinterpretAsKeyedStream to avoid having to do another keyBy for the same key.  The BackEndState is RocksDB.


When the job recovers after a failure, the ProcessFunction after the keyBy restores its Keyed State correctly, while the Process function after reinterpretAsKeyedStream does not recover the Keyed State.


I have checked the data written by the checkpoints and there is a reference to the sate.


If I change and use keyBy instead of DataStreamUtils.reinterpretAsKeyedStream  the Keyed State is recovered as expected.


Is the DataStreamUtils.reinterpretAsKeyedStream function not intended to use Keyed State? 


Thank you.

Regards,


Jose