late element and expired state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

late element and expired state

Aggarwal, Ajay

Hello,

 

I have some questions regarding best practices to deal with ever expanding state with KeyBy(). In my input stream I will continue to see new keys. And I am using Keyed state. How do I keep the total state in limit? After reading the flink documentation and some blogs I am planning to use following :

 

  • When I know I have seen the “last” element associated with a key, I can manually clear the state
  • I can also use the TTL on state and expire it and garbage collect it (with next full snapshot). This is useful when I never see the “last” element.

 

Is that the right strategy?

 

Also if an element arrives late (after the state has been cleared), how do I detect that the state has been cleared/expired so I can skip these late elements ? Is there an API that will give you the hint about cleared/expired state?

 

Thanks.

 

Ajay

Reply | Threaded
Open this post in threaded view
|

Re: late element and expired state

Yun Tang
Hi Ajay,

From your description, I think watermarks[1], which indicates all earlier events have been arrived, might meet your requests in a way. But this means you should use windows and have event-time in your stream job.

If you don't want to introduce the concept of window, I think you can use 'KeyedStateBackend#applyToAllKeys' to manually clear the target state when you see the "last" element, and record the cleared state name into a pre-definied operator state, so that arrived late elements could be skipped. Just be careful to not let the list in operator state not so large, e.g. only keep a fixed size of expired states.


Best
Yun Tang

From: Aggarwal, Ajay <[hidden email]>
Sent: Tuesday, February 5, 2019 22:54
To: [hidden email]
Subject: late element and expired state
 

Hello,

 

I have some questions regarding best practices to deal with ever expanding state with KeyBy(). In my input stream I will continue to see new keys. And I am using Keyed state. How do I keep the total state in limit? After reading the flink documentation and some blogs I am planning to use following :

 

  • When I know I have seen the “last” element associated with a key, I can manually clear the state
  • I can also use the TTL on state and expire it and garbage collect it (with next full snapshot). This is useful when I never see the “last” element.

 

Is that the right strategy?

 

Also if an element arrives late (after the state has been cleared), how do I detect that the state has been cleared/expired so I can skip these late elements ? Is there an API that will give you the hint about cleared/expired state?

 

Thanks.

 

Ajay

Reply | Threaded
Open this post in threaded view
|

Re: late element and expired state

Congxian Qiu
In reply to this post by Aggarwal, Ajay
Hi, Aggarwal.
     Your strategy to limit the total state is right. And there did not have an API will give user the hint about cleared/expired state.
       I think you can associate every key with two states: one as the ttl-state, one as the “seen”-state(ValeState<Boolean>)—the “seen”-state will tell you whether the state has come before.
        If the key did not in the ttl-state and the “seen”-state then it was the first time it come, update both states, if the key in both states, do your own logic, if the key in the “seen”-state and not in the ttl-state, it has been expired, and there would be a situation the key in the ttl-state and not in the “seen”-state.

Best,
Congxian


Aggarwal, Ajay <[hidden email]> 于2019年2月5日周二 下午10:54写道:

Hello,

 

I have some questions regarding best practices to deal with ever expanding state with KeyBy(). In my input stream I will continue to see new keys. And I am using Keyed state. How do I keep the total state in limit? After reading the flink documentation and some blogs I am planning to use following :

 

  • When I know I have seen the “last” element associated with a key, I can manually clear the state
  • I can also use the TTL on state and expire it and garbage collect it (with next full snapshot). This is useful when I never see the “last” element.

 

Is that the right strategy?

 

Also if an element arrives late (after the state has been cleared), how do I detect that the state has been cleared/expired so I can skip these late elements ? Is there an API that will give you the hint about cleared/expired state?

 

Thanks.

 

Ajay

--
Best,
Congxian