# of active session windows of a streaming job

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

# of active session windows of a streaming job

Dongwon Kim-2
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim
Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Fabian Hueske-2
Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim

Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Dongwon Kim-2
Hi Fabian,

Thanks for the suggestion!

Currently we are just measuring finished sessions for a quick prototype: 

I'll try your suggestion and let you know :-)

Thanks,

- Dongwon 

On Tue, Feb 20, 2018 at 6:53 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim


Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Dongwon Kim-2
In reply to this post by Fabian Hueske-2
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim


Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Fabian Hueske-2
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim



Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Dongwon Kim-2
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows)
I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged windows minus one.
You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. 
We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim




Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Fabian Hueske-2
Hi Dongwon,

You are of course right! We need to decrement the counter when the window is closed.

The idea of using Trigger.clear() (the clean up method is called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it is merged.
So, I think you are right and we only need to increment the counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows)
I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged windows minus one.
You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. 
We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim





Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Chesnay Schepler
Checkpointing of metrics is a manual process.
The operator must write the current value into state, retrieve it on restore and restore the counter's count.

On 20.06.2018 12:10, Fabian Hueske wrote:
Hi Dongwon,

You are of course right! We need to decrement the counter when the window is closed.

The idea of using Trigger.clear() (the clean up method is called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it is merged.
So, I think you are right and we only need to increment the counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows)
I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged windows minus one.
You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. 
We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim






Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Dongwon Kim-2
Hi Fabian and Chesnay,

As Chesnay pointed out, it seems that I need to write the current counter (which is defined inside Trigger) into state which I think should be the operator state of the window operator.
However, as I previously said, TriggerContext allows for users to access only the partitioned state that are scoped to the key and the window of the current Trigger invocation.
There's no way for me to access to the operator state of the window operator through TriggerContext.
The partitioned state doesn't seem suitable as we have more than ten million keys.
This amount of keys could possibly break down the metric system and the external metric systems like Ganglia and Prometheus.

What I want the most is to achieve the goal using the current API (I'm using Flink-1.4.2) without modification.
But a change in TriggerContext seems unavoidable because it has to expose an additional method for users like me to access to the operator state of the window operator.

Thank you guys for the useful discussion.

p.s. Fabian, yes you're right. It is Trigger.clear(), not Trigger.onClose().

Best,
- Dongwon


On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <[hidden email]> wrote:
Checkpointing of metrics is a manual process.
The operator must write the current value into state, retrieve it on restore and restore the counter's count.


On 20.06.2018 12:10, Fabian Hueske wrote:
Hi Dongwon,

You are of course right! We need to decrement the counter when the window is closed.

The idea of using Trigger.clear() (the clean up method is called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it is merged.
So, I think you are right and we only need to increment the counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows)
I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged windows minus one.
You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. 
We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim







Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Fabian Hueske-2
Hi Dongwon,

Yes, the counter state should be stored in operator state which is not available on Triggers.
Chesnay: Can a window function (like ProcessWindowFunction) access (read, write) the counter of its associated Trigger to checkpoint and restore it?

Best, Fabian

2018-06-20 16:59 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian and Chesnay,

As Chesnay pointed out, it seems that I need to write the current counter (which is defined inside Trigger) into state which I think should be the operator state of the window operator.
However, as I previously said, TriggerContext allows for users to access only the partitioned state that are scoped to the key and the window of the current Trigger invocation.
There's no way for me to access to the operator state of the window operator through TriggerContext.
The partitioned state doesn't seem suitable as we have more than ten million keys.
This amount of keys could possibly break down the metric system and the external metric systems like Ganglia and Prometheus.

What I want the most is to achieve the goal using the current API (I'm using Flink-1.4.2) without modification.
But a change in TriggerContext seems unavoidable because it has to expose an additional method for users like me to access to the operator state of the window operator.

Thank you guys for the useful discussion.

p.s. Fabian, yes you're right. It is Trigger.clear(), not Trigger.onClose().

Best,
- Dongwon


On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <[hidden email]> wrote:
Checkpointing of metrics is a manual process.
The operator must write the current value into state, retrieve it on restore and restore the counter's count.


On 20.06.2018 12:10, Fabian Hueske wrote:
Hi Dongwon,

You are of course right! We need to decrement the counter when the window is closed.

The idea of using Trigger.clear() (the clean up method is called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it is merged.
So, I think you are right and we only need to increment the counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows)
I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged windows minus one.
You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. 
We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim








Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Chesnay Schepler
Without modifications to Flink? No. By design nothing can intercept or retrieve metrics with the metrics API.
For this pattern the usual recommendation is to explicitly pass the metric to components that require it.

If modifications are an option, what you could do is
* define a counter in the OperatorIOMetricGroup
* have the operator checkpoint/restore the counter,
* access it in the trigger by casting your way through the MetricGroups to an OperatorMetricGroup from which you can retrieve the OperatorIOMetricGroup.


On 21.06.2018 11:16, Fabian Hueske wrote:
Hi Dongwon,

Yes, the counter state should be stored in operator state which is not available on Triggers.
Chesnay: Can a window function (like ProcessWindowFunction) access (read, write) the counter of its associated Trigger to checkpoint and restore it?

Best, Fabian

2018-06-20 16:59 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian and Chesnay,

As Chesnay pointed out, it seems that I need to write the current counter (which is defined inside Trigger) into state which I think should be the operator state of the window operator.
However, as I previously said, TriggerContext allows for users to access only the partitioned state that are scoped to the key and the window of the current Trigger invocation.
There's no way for me to access to the operator state of the window operator through TriggerContext.
The partitioned state doesn't seem suitable as we have more than ten million keys.
This amount of keys could possibly break down the metric system and the external metric systems like Ganglia and Prometheus.

What I want the most is to achieve the goal using the current API (I'm using Flink-1.4.2) without modification.
But a change in TriggerContext seems unavoidable because it has to expose an additional method for users like me to access to the operator state of the window operator.

Thank you guys for the useful discussion.

p.s. Fabian, yes you're right. It is Trigger.clear(), not Trigger.onClose().

Best,
- Dongwon


On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <[hidden email]> wrote:
Checkpointing of metrics is a manual process.
The operator must write the current value into state, retrieve it on restore and restore the counter's count.


On 20.06.2018 12:10, Fabian Hueske wrote:
Hi Dongwon,

You are of course right! We need to decrement the counter when the window is closed.

The idea of using Trigger.clear() (the clean up method is called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it is merged.
So, I think you are right and we only need to increment the counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows)
I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged windows minus one.
You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. 
We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim









Reply | Threaded
Open this post in threaded view
|

Re: # of active session windows of a streaming job

Dongwon Kim-2
Hi Fabian and Chesnay,

Thank you guys.

Fabian : Unfortunately, as Chesnay said, MetricGroup doesn't allow for ProcessWindowFunction to access to a counter defined in Trigger.
Chesnay : I'm going to follow your advice on how to modify Flink. Thank you very much!

Best,

- Dongwon

On Thu, Jun 21, 2018 at 10:26 PM, Chesnay Schepler <[hidden email]> wrote:
Without modifications to Flink? No. By design nothing can intercept or retrieve metrics with the metrics API.
For this pattern the usual recommendation is to explicitly pass the metric to components that require it.

If modifications are an option, what you could do is
* define a counter in the OperatorIOMetricGroup
* have the operator checkpoint/restore the counter,
* access it in the trigger by casting your way through the MetricGroups to an OperatorMetricGroup from which you can retrieve the OperatorIOMetricGroup.



On 21.06.2018 11:16, Fabian Hueske wrote:
Hi Dongwon,

Yes, the counter state should be stored in operator state which is not available on Triggers.
Chesnay: Can a window function (like ProcessWindowFunction) access (read, write) the counter of its associated Trigger to checkpoint and restore it?

Best, Fabian

2018-06-20 16:59 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian and Chesnay,

As Chesnay pointed out, it seems that I need to write the current counter (which is defined inside Trigger) into state which I think should be the operator state of the window operator.
However, as I previously said, TriggerContext allows for users to access only the partitioned state that are scoped to the key and the window of the current Trigger invocation.
There's no way for me to access to the operator state of the window operator through TriggerContext.
The partitioned state doesn't seem suitable as we have more than ten million keys.
This amount of keys could possibly break down the metric system and the external metric systems like Ganglia and Prometheus.

What I want the most is to achieve the goal using the current API (I'm using Flink-1.4.2) without modification.
But a change in TriggerContext seems unavoidable because it has to expose an additional method for users like me to access to the operator state of the window operator.

Thank you guys for the useful discussion.

p.s. Fabian, yes you're right. It is Trigger.clear(), not Trigger.onClose().

Best,
- Dongwon


On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <[hidden email]> wrote:
Checkpointing of metrics is a manual process.
The operator must write the current value into state, retrieve it on restore and restore the counter's count.


On 20.06.2018 12:10, Fabian Hueske wrote:
Hi Dongwon,

You are of course right! We need to decrement the counter when the window is closed.

The idea of using Trigger.clear() (the clean up method is called clear() instead of onClose()) method is great!
It will be called when the window is closed but also when it is merged.
So, I think you are right and we only need to increment the counter in Trigger.onElement() and decrement in Trigger.clear().

I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in CC) would know that.
Not sure what would be the best approach if you need a fault tolerant solution.

Best, Fabian




2018-06-19 16:38 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows)
I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged windows minus one.
You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. 
We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of the record and ends at timestamp + gap size. When a record is added to a window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge window with overlapping ranges. When windows are merged, Trigger.onMerge() is called.

In order to track how many session windows exist, we would need to increment a counter by one when a new window is created (or an element is assigned to a window, which is equivalent for session windows) and decrement the counter when windows are merged by the number of merged windows minus one.

Incrementing the counter is rather easy and can be done in Trigger.onElement(), either by using state or a Counter metric (Triggers have access to the metric system).
However, decrementing the counter is difficult. Although the Trigger.onMerge() method is called, it does not know how many windows were merged (which is done by the WindowAssigner) and only sees the merged window. There might be a way to maintain state in a Trigger that allows to infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim <[hidden email]>:
Hi Fabian,

I'm still eager to expose # of active sessions as a key metric of our service but I haven’t figured it out yet.

First of all, I want to ask you some questions regarding your suggestion.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
Q1) 
How to fire when a new window is created and when the window is closed?
AFAIK, we can return TriggerResult only through the three functions: onElement, onEventTime, and onProcessingTime.
Q2)
Firing is to emit elements in windows down to the window function, not emitting values like +1, -1 and -2 which are not in windows.
Or do I miss something that you meant?
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.
Q3) 
I didn't understand what you mean at all. What do you mean by carrying the merging information?

Besides your suggestion, I implemented a custom trigger which is almost the same as EventTimeTrigger except the followings:
- it maintains a variable to count sessions in an instance of a window operator
- it increases the variable by 1 when onElement is invoked
- it decreases the variable by 1 when onClose is invoked
Considering the logic of Flink’s session window, it correctly counts sessions in an instance of a window operator. 

As you might have already noticed, this approach has a critical problem: there's no way to maintain an operator state inside a trigger. 
TriggerContext only allows to interact with state that is scoped to the window and the key of the current trigger invocation (as shown in Trigger#TriggerContext)

Now I've come to a conclusion that it might not be possible using DataStream API.
Otherwise, do I need to think in a totally different way to achieve the goal?

Best,

- Dongwon



2018. 2. 20. 오후 6:53, Fabian Hueske <[hidden email]> 작성:

Hi Dongwon Kim,

That's an interesting question.

I don't have a solution blueprint for you, but a few ideas that should help to solve the problem.

I would start with a separate job first and later try to integrate it with the other job.
You could implement a Trigger that fires when a new window is created and when the window is closed. A ProcessWindowFunction would emit a +1 if the window was created and a -1 when the window is closes.
Session windows are a bit special, because you also need to handle the case of merging windows, i.e., two opened windows can be merged and only one (the merged) window is closed. So would need to emit a -2 if a merged window was closes (assuming only two windows were merged).
In order to do that, you'd need to carry the merging information forward. The Trigger.onMerge method cannot trigger the window function, but it could store the merging information in state that is later accessed.

Hope this helps,
Fabian

2018-02-20 9:54 GMT+01:00 Dongwon Kim <[hidden email]>:
Hi,

It could be a totally stupid question but I currently have no idea how to get the number of active session windows from a running job.

Our traffic trajectory application (which handles up to 10,000 tps) uses event-time session window on KeyedStream (keyed by userID).

Should I write another Flink job for the purpose?

Cheers,

Dongwon Kim