clear() in a ProcessWindowFunction

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

clear() in a ProcessWindowFunction

Vishal Santoshi
Hello folks,
                  The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows ) 

Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.


The questions are 

* How do I have state per session window/ per key and still be able to clear it ?
* Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ? 

Regards.



Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Vishal Santoshi
Essentially, Does this code leak state 

private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
extends
ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
private static final long serialVersionUID = 1L;
private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
String.class);

@Override
public void process(KEY key,
ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
throws Exception {
// I need this scoped to key/window
if (getRuntimeContext().getState(sessionId).value() == null) {
UUID uuid = UUID.randomUUID();
getRuntimeContext().getState(sessionId).update(uuid.toString());
}
String uuid = getRuntimeContext().getState(sessionId).value();
out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
}
}

On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
Hello folks,
                  The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows ) 

Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.


The questions are 

* How do I have state per session window/ per key and still be able to clear it ?
* Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ? 

Regards.



Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Roman Khachatryan
Hi Vishal,

There is no leak in the code you provided (except that the number of
keys can grow).
But as you figured out the state is scoped to key, not to window+key.

Could you explain what you are trying to achieve and why do you need to combine
sliding windows with state scoped to window+key?

Regards,
Roman

On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
<[hidden email]> wrote:

>
> Essentially, Does this code leak state
>
> private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
> extends
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
> String.class);
>
> @Override
> public void process(KEY key,
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> throws Exception {
> // I need this scoped to key/window
> if (getRuntimeContext().getState(sessionId).value() == null) {
> UUID uuid = UUID.randomUUID();
> getRuntimeContext().getState(sessionId).update(uuid.toString());
> }
> String uuid = getRuntimeContext().getState(sessionId).value();
> out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> }
> }
>
> On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>
>> Hello folks,
>>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>
>> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>
>>
>> The questions are
>>
>> * How do I have state per session window/ per key and still be able to clear it ?
>> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>
>> Regards.
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Vishal Santoshi
I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.  

>> (except that the number of keys can grow).

Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?



On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <[hidden email]> wrote:
Hi Vishal,

There is no leak in the code you provided (except that the number of
keys can grow).
But as you figured out the state is scoped to key, not to window+key.

Could you explain what you are trying to achieve and why do you need to combine
sliding windows with state scoped to window+key?

Regards,
Roman

On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
<[hidden email]> wrote:
>
> Essentially, Does this code leak state
>
> private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
> extends
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
> String.class);
>
> @Override
> public void process(KEY key,
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> throws Exception {
> // I need this scoped to key/window
> if (getRuntimeContext().getState(sessionId).value() == null) {
> UUID uuid = UUID.randomUUID();
> getRuntimeContext().getState(sessionId).update(uuid.toString());
> }
> String uuid = getRuntimeContext().getState(sessionId).value();
> out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> }
> }
>
> On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>
>> Hello folks,
>>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>
>> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>
>>
>> The questions are
>>
>> * How do I have state per session window/ per key and still be able to clear it ?
>> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>
>> Regards.
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Vishal Santoshi
Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level.... 



On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <[hidden email]> wrote:
I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.  

>> (except that the number of keys can grow).

Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?



On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <[hidden email]> wrote:
Hi Vishal,

There is no leak in the code you provided (except that the number of
keys can grow).
But as you figured out the state is scoped to key, not to window+key.

Could you explain what you are trying to achieve and why do you need to combine
sliding windows with state scoped to window+key?

Regards,
Roman

On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
<[hidden email]> wrote:
>
> Essentially, Does this code leak state
>
> private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
> extends
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
> String.class);
>
> @Override
> public void process(KEY key,
> ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
> Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
> throws Exception {
> // I need this scoped to key/window
> if (getRuntimeContext().getState(sessionId).value() == null) {
> UUID uuid = UUID.randomUUID();
> getRuntimeContext().getState(sessionId).update(uuid.toString());
> }
> String uuid = getRuntimeContext().getState(sessionId).value();
> out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
> }
> }
>
> On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>
>> Hello folks,
>>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>
>> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>
>>
>> The questions are
>>
>> * How do I have state per session window/ per key and still be able to clear it ?
>> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>
>> Regards.
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Roman Khachatryan
> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
Window state is cleared (as well as the window itself), but global
state is not (unless you use TTL).

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman

On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
<[hidden email]> wrote:

>
> Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level....
>
>
>
> On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <[hidden email]> wrote:
>>
>> I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.
>>
>> >> (except that the number of keys can grow).
>>
>> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>
>>
>>
>> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Hi Vishal,
>>>
>>> There is no leak in the code you provided (except that the number of
>>> keys can grow).
>>> But as you figured out the state is scoped to key, not to window+key.
>>>
>>> Could you explain what you are trying to achieve and why do you need to combine
>>> sliding windows with state scoped to window+key?
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> <[hidden email]> wrote:
>>> >
>>> > Essentially, Does this code leak state
>>> >
>>> > private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
>>> > extends
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> > private static final long serialVersionUID = 1L;
>>> > private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
>>> > String.class);
>>> >
>>> > @Override
>>> > public void process(KEY key,
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>>> > Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> > throws Exception {
>>> > // I need this scoped to key/window
>>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> > UUID uuid = UUID.randomUUID();
>>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> > }
>>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> > }
>>> > }
>>> >
>>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>> >>
>>> >> Hello folks,
>>> >>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>> >>
>>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>> >>
>>> >>
>>> >> The questions are
>>> >>
>>> >> * How do I have state per session window/ per key and still be able to clear it ?
>>> >> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>> >>
>>> >> Regards.
>>> >>
>>> >>
>>> >>
Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Vishal Santoshi
Yep, makes sense. 

On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <[hidden email]> wrote:
> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
Window state is cleared (as well as the window itself), but global
state is not (unless you use TTL).

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman

On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
<[hidden email]> wrote:
>
> Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level....
>
>
>
> On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <[hidden email]> wrote:
>>
>> I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.
>>
>> >> (except that the number of keys can grow).
>>
>> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>
>>
>>
>> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Hi Vishal,
>>>
>>> There is no leak in the code you provided (except that the number of
>>> keys can grow).
>>> But as you figured out the state is scoped to key, not to window+key.
>>>
>>> Could you explain what you are trying to achieve and why do you need to combine
>>> sliding windows with state scoped to window+key?
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> <[hidden email]> wrote:
>>> >
>>> > Essentially, Does this code leak state
>>> >
>>> > private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
>>> > extends
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> > private static final long serialVersionUID = 1L;
>>> > private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
>>> > String.class);
>>> >
>>> > @Override
>>> > public void process(KEY key,
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>>> > Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> > throws Exception {
>>> > // I need this scoped to key/window
>>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> > UUID uuid = UUID.randomUUID();
>>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> > }
>>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> > }
>>> > }
>>> >
>>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>> >>
>>> >> Hello folks,
>>> >>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>> >>
>>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>> >>
>>> >>
>>> >> The questions are
>>> >>
>>> >> * How do I have state per session window/ per key and still be able to clear it ?
>>> >> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>> >>
>>> >> Regards.
>>> >>
>>> >>
>>> >>
Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Vishal Santoshi
I had a query Say I have a single key with 2 live sessions ( A and B )  with a configured lateness . 

Do these invariants hold?

* The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl )
* The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection )
*  The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization ) 
* The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently.  There is no need of synchronizing access to the state as it for the same key. 

What I am not sure about is what happens when session A merge with session B. I would assume that it just is defining new start and end of the merged window, Gcing the old ones ( or at least one of them ) and assigning that even to that new window. What one does with the custom state in ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is done in the process method above,  As in this state is 1 degree removed from what ever flink does internally with it's merges given that the state is scoped to the key.





 

On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <[hidden email]> wrote:
Yep, makes sense. 

On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <[hidden email]> wrote:
> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
Window state is cleared (as well as the window itself), but global
state is not (unless you use TTL).

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman

On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
<[hidden email]> wrote:
>
> Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level....
>
>
>
> On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <[hidden email]> wrote:
>>
>> I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.
>>
>> >> (except that the number of keys can grow).
>>
>> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>
>>
>>
>> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> Hi Vishal,
>>>
>>> There is no leak in the code you provided (except that the number of
>>> keys can grow).
>>> But as you figured out the state is scoped to key, not to window+key.
>>>
>>> Could you explain what you are trying to achieve and why do you need to combine
>>> sliding windows with state scoped to window+key?
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> <[hidden email]> wrote:
>>> >
>>> > Essentially, Does this code leak state
>>> >
>>> > private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
>>> > extends
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> > private static final long serialVersionUID = 1L;
>>> > private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
>>> > String.class);
>>> >
>>> > @Override
>>> > public void process(KEY key,
>>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>>> > Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> > throws Exception {
>>> > // I need this scoped to key/window
>>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> > UUID uuid = UUID.randomUUID();
>>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> > }
>>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> > }
>>> > }
>>> >
>>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>> >>
>>> >> Hello folks,
>>> >>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>> >>
>>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>> >>
>>> >>
>>> >> The questions are
>>> >>
>>> >> * How do I have state per session window/ per key and still be able to clear it ?
>>> >> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>> >>
>>> >> Regards.
>>> >>
>>> >>
>>> >>
Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Roman Khachatryan
Hi Vishal,

Sorry for the late reply,
Please find my answers below.
By state I assume the state obtained via getRuntimeContext (access to
window state is not allowed)..

> The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl )
Yes.

> The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection )
Right, but you need to configure TTL when accessing the state [1]

>  The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization )
Right.

> The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently.  There is no need of synchronizing access to the state as it for the same key.
Right.

Your understanding of merging of window contents is also correct.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman


On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
<[hidden email]> wrote:

>
> I had a query Say I have a single key with 2 live sessions ( A and B )  with a configured lateness .
>
> Do these invariants hold?
>
> * The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl )
> * The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection )
> *  The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization )
> * The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently.  There is no need of synchronizing access to the state as it for the same key.
>
> What I am not sure about is what happens when session A merge with session B. I would assume that it just is defining new start and end of the merged window, Gcing the old ones ( or at least one of them ) and assigning that even to that new window. What one does with the custom state in ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is done in the process method above,  As in this state is 1 degree removed from what ever flink does internally with it's merges given that the state is scoped to the key.
>
>
>
>
>
>
>
> On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <[hidden email]> wrote:
>>
>> Yep, makes sense.
>>
>> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> > Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>> Window state is cleared (as well as the window itself), but global
>>> state is not (unless you use TTL).
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>> <[hidden email]> wrote:
>>> >
>>> > Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level....
>>> >
>>> >
>>> >
>>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <[hidden email]> wrote:
>>> >>
>>> >> I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.
>>> >>
>>> >> >> (except that the number of keys can grow).
>>> >>
>>> >> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <[hidden email]> wrote:
>>> >>>
>>> >>> Hi Vishal,
>>> >>>
>>> >>> There is no leak in the code you provided (except that the number of
>>> >>> keys can grow).
>>> >>> But as you figured out the state is scoped to key, not to window+key.
>>> >>>
>>> >>> Could you explain what you are trying to achieve and why do you need to combine
>>> >>> sliding windows with state scoped to window+key?
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> >>> <[hidden email]> wrote:
>>> >>> >
>>> >>> > Essentially, Does this code leak state
>>> >>> >
>>> >>> > private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
>>> >>> > extends
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> >>> > private static final long serialVersionUID = 1L;
>>> >>> > private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
>>> >>> > String.class);
>>> >>> >
>>> >>> > @Override
>>> >>> > public void process(KEY key,
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> >>> > throws Exception {
>>> >>> > // I need this scoped to key/window
>>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> >>> > UUID uuid = UUID.randomUUID();
>>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> >>> > }
>>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> >>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> >>> > }
>>> >>> > }
>>> >>> >
>>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>> >>> >>
>>> >>> >> Hello folks,
>>> >>> >>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>> >>> >>
>>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>> >>> >>
>>> >>> >>
>>> >>> >> The questions are
>>> >>> >>
>>> >>> >> * How do I have state per session window/ per key and still be able to clear it ?
>>> >>> >> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>> >>> >>
>>> >>> >> Regards.
>>> >>> >>
>>> >>> >>
>>> >>> >>
Reply | Threaded
Open this post in threaded view
|

Re: clear() in a ProcessWindowFunction

Vishal Santoshi
Thank you for the confirmation. The simulations confirm too.

On Fri, Apr 9, 2021 at 11:14 AM Roman Khachatryan <[hidden email]> wrote:
Hi Vishal,

Sorry for the late reply,
Please find my answers below.
By state I assume the state obtained via getRuntimeContext (access to
window state is not allowed)..

> The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl )
Yes.

> The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection )
Right, but you need to configure TTL when accessing the state [1]

>  The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization )
Right.

> The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently.  There is no need of synchronizing access to the state as it for the same key.
Right.

Your understanding of merging of window contents is also correct.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Regards,
Roman


On Wed, Mar 31, 2021 at 5:45 PM Vishal Santoshi
<[hidden email]> wrote:
>
> I had a query Say I have a single key with 2 live sessions ( A and B )  with a configured lateness .
>
> Do these invariants hold?
>
> * The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl )
> * The state will remain alive irrespective of whether the Window is closed or not (a TTL timer does the collection )
> *  The execution on a key is sequential , as in if 2 events arrive for the 2 Sessions they happen sequentially ( or in any order but without the need of synchronization )
> * The state mutated by an event in Session A, will be visible to Session B if an event incident on Session B was to happen subsequently.  There is no need of synchronizing access to the state as it for the same key.
>
> What I am not sure about is what happens when session A merge with session B. I would assume that it just is defining new start and end of the merged window, Gcing the old ones ( or at least one of them ) and assigning that even to that new window. What one does with the custom state in ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is done in the process method above,  As in this state is 1 degree removed from what ever flink does internally with it's merges given that the state is scoped to the key.
>
>
>
>
>
>
>
> On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi <[hidden email]> wrote:
>>
>> Yep, makes sense.
>>
>> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan <[hidden email]> wrote:
>>>
>>> > Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>> Window state is cleared (as well as the window itself), but global
>>> state is not (unless you use TTL).
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>>
>>> Regards,
>>> Roman
>>>
>>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>> <[hidden email]> wrote:
>>> >
>>> > Sometimes writing it down makes you think. I now realize that this is not the right approach, given that merging windows will have their own states..and how the merge happens is really at the key level....
>>> >
>>> >
>>> >
>>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <[hidden email]> wrote:
>>> >>
>>> >> I intend to augment every event in a session  with a unique ID.  To keep the session lean, there is a PurgingTrigger on this aggregate that  fires on a count of 1.
>>> >>
>>> >> >> (except that the number of keys can grow).
>>> >>
>>> >> Want to confirm that the keys are GCed ( along with state ) once the  (windows close + lateness ) ?
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan <[hidden email]> wrote:
>>> >>>
>>> >>> Hi Vishal,
>>> >>>
>>> >>> There is no leak in the code you provided (except that the number of
>>> >>> keys can grow).
>>> >>> But as you figured out the state is scoped to key, not to window+key.
>>> >>>
>>> >>> Could you explain what you are trying to achieve and why do you need to combine
>>> >>> sliding windows with state scoped to window+key?
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>>> >>> <[hidden email]> wrote:
>>> >>> >
>>> >>> > Essentially, Does this code leak state
>>> >>> >
>>> >>> > private static class SessionIdProcessWindowFunction<KEY extends java.io.Serializable, VALUE extends java.io.Serializable>
>>> >>> > extends
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow> {
>>> >>> > private static final long serialVersionUID = 1L;
>>> >>> > private final static ValueStateDescriptor<String> sessionId = new ValueStateDescriptor<String>("session_uid",
>>> >>> > String.class);
>>> >>> >
>>> >>> > @Override
>>> >>> > public void process(KEY key,
>>> >>> > ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, VALUE>, KEY, TimeWindow>.Context context,
>>> >>> > Iterable<KeyedSession<KEY, VALUE>> elements, Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
>>> >>> > throws Exception {
>>> >>> > // I need this scoped to key/window
>>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>>> >>> > UUID uuid = UUID.randomUUID();
>>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>>> >>> > }
>>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>>> >>> > out.collect(new KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>>> >>> > }
>>> >>> > }
>>> >>> >
>>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <[hidden email]> wrote:
>>> >>> >>
>>> >>> >> Hello folks,
>>> >>> >>                   The suggestion is to use windowState() for a key key per window state and clear the state explicitly.  Also it seems that getRuntime().getState() will return a globalWindow() where state is shared among windows with the same key. I desire of course to have state scoped to a key per window and was wanting to use windowState().. The caveat is that my window is a Session Window and when I try to use clear()  I am thrown this exception  ( Session Windows are Merging Windows )
>>> >>> >>
>>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
>>> >>> >>
>>> >>> >>
>>> >>> >> The questions are
>>> >>> >>
>>> >>> >> * How do I have state per session window/ per key and still be able to clear it ?
>>> >>> >> * Does getRuntime().getState() give me the clear() semantics for free along with state per window per key and thus I  have understood  getRuntime().getState() wrong ?
>>> >>> >>
>>> >>> >> Regards.
>>> >>> >>
>>> >>> >>
>>> >>> >>