Flink 1.4: Queryable State Client

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.4: Queryable State Client

Seye Jin
I recently upgraded to flink 1.4 from 1.3 and leverage Queryable State client in my application. I have 1 jm and 5 tm all serviced behind kubernetes. A large state is built and distributed evenly across task mangers and the client can query state for specified key

Issue: if a task manager dies and a new one gets spun up(automatically) and the QS states successfully recover in new nodes/task slots. I start to get time out exception when the client tries to query for key, even if I try to reset or re-deploy the client jobs

I have been trying to triage this and figure out a way to remediate this issue and I found that in KvStateClientProxyHandler which is not exposed in code, there is a forceUpdate flag that can help reset KvStateLocations(plus inetAddresses) but the default is false and can't be overriden

I was wandering if anyone knows how to remediate this kind of issue or if there is a way to have the jobmanager know that the task manager location in cache is no more valid.

Any tip to resolve this will be appreciated (I can't downgrade back to 1.3 or upgrade from 1.4)

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.4: Queryable State Client

vino yang
Hi Seye,

It seems that you have conducted an in-depth analysis of this issue. 
If you think it's a bug or need improvement. Please feel free to create a JIRA issue to track its status.

Thanks, vino.

Seye Jin <[hidden email]> 于2018年10月14日周日 上午12:02写道:
I recently upgraded to flink 1.4 from 1.3 and leverage Queryable State client in my application. I have 1 jm and 5 tm all serviced behind kubernetes. A large state is built and distributed evenly across task mangers and the client can query state for specified key

Issue: if a task manager dies and a new one gets spun up(automatically) and the QS states successfully recover in new nodes/task slots. I start to get time out exception when the client tries to query for key, even if I try to reset or re-deploy the client jobs

I have been trying to triage this and figure out a way to remediate this issue and I found that in KvStateClientProxyHandler which is not exposed in code, there is a forceUpdate flag that can help reset KvStateLocations(plus inetAddresses) but the default is false and can't be overriden

I was wandering if anyone knows how to remediate this kind of issue or if there is a way to have the jobmanager know that the task manager location in cache is no more valid.

Any tip to resolve this will be appreciated (I can't downgrade back to 1.3 or upgrade from 1.4)

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.4: Queryable State Client

Jörn Franke
In reply to this post by Seye Jin
You have to file an issue. One workaround to see if this really fixes your problem could be to use reflection to mark this method as public and then call it (it is of course nothing for production code). You can also try a newer Flink version.

> Am 13.10.2018 um 18:02 schrieb Seye Jin <[hidden email]>:
>
> I recently upgraded to flink 1.4 from 1.3 and leverage Queryable State client in my application. I have 1 jm and 5 tm all serviced behind kubernetes. A large state is built and distributed evenly across task mangers and the client can query state for specified key
>
> Issue: if a task manager dies and a new one gets spun up(automatically) and the QS states successfully recover in new nodes/task slots. I start to get time out exception when the client tries to query for key, even if I try to reset or re-deploy the client jobs
>
> I have been trying to triage this and figure out a way to remediate this issue and I found that in KvStateClientProxyHandler which is not exposed in code, there is a forceUpdate flag that can help reset KvStateLocations(plus inetAddresses) but the default is false and can't be overriden
>
> I was wandering if anyone knows how to remediate this kind of issue or if there is a way to have the jobmanager know that the task manager location in cache is no more valid.
>
> Any tip to resolve this will be appreciated (I can't downgrade back to 1.3 or upgrade from 1.4)
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.4: Queryable State Client

Kostas Kloudas
Hi Seye,

Thanks for digging into the problem.

As Vino and Jorn suggested, this looks like a bug and please file a JIRA issue.
It would be also nice if you could post it  here so that we know the related discussion.

Cheers,
Kostas

> On Oct 14, 2018, at 9:46 AM, Jörn Franke <[hidden email]> wrote:
>
> You have to file an issue. One workaround to see if this really fixes your problem could be to use reflection to mark this method as public and then call it (it is of course nothing for production code). You can also try a newer Flink version.
>
>> Am 13.10.2018 um 18:02 schrieb Seye Jin <[hidden email]>:
>>
>> I recently upgraded to flink 1.4 from 1.3 and leverage Queryable State client in my application. I have 1 jm and 5 tm all serviced behind kubernetes. A large state is built and distributed evenly across task mangers and the client can query state for specified key
>>
>> Issue: if a task manager dies and a new one gets spun up(automatically) and the QS states successfully recover in new nodes/task slots. I start to get time out exception when the client tries to query for key, even if I try to reset or re-deploy the client jobs
>>
>> I have been trying to triage this and figure out a way to remediate this issue and I found that in KvStateClientProxyHandler which is not exposed in code, there is a forceUpdate flag that can help reset KvStateLocations(plus inetAddresses) but the default is false and can't be overriden
>>
>> I was wandering if anyone knows how to remediate this kind of issue or if there is a way to have the jobmanager know that the task manager location in cache is no more valid.
>>
>> Any tip to resolve this will be appreciated (I can't downgrade back to 1.3 or upgrade from 1.4)
>>