Window metadata removal

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Window metadata removal

gil bl
Hi,
 
I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.
  • What is the purpose of keeping this data if no new events are expected to enter the pane?
  • Is there any way this metadata can be released earlier?
Reply | Threaded
Open this post in threaded view
|

Re: Window metadata removal

Fabian Hueske-2
Hi,

A window needs to keep the data as long as it expects new data.
This is clearly the case before the end time of the window was reached. If my window ends at 12:30, I want to wait (at least) until 12:30 before I remove any data, right?

In case you expect some data to be late, you can configure allowedLateness.
Let's say, we configure allowedLateness of 10 minutes. In that case, Flink would keep the metadata of the window that closes at 12:30 until 12:40.
The data is kept to be able to update the result of the window until allowedLateness has passed.
If we for example receive a late record at 12:38, we can still update the result of the window because we kept all required data.

If you don't need allowedLateness, don't configure it (the default is 0).

Best, Fabian

Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl <[hidden email]>:
Hi,
 
I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.
  • What is the purpose of keeping this data if no new events are expected to enter the pane?
  • Is there any way this metadata can be released earlier?
Reply | Threaded
Open this post in threaded view
|

Re: Window metadata removal

gil bl
Hi Fabian,  
Thank you for your reply.
 
I'm not sure my question was clear enough so I'll try to explain our scenario:
  1. We are working in “event time” mode.
  2. We want to handle ‘late data’ up to last X days (for example last 7 days)
  3. For each incoming event:
    1. The event is being aggregated using window function.
    2. When the window if “fired”, the accumulated data is forwarded to “sink” function and all data is being purged from the window.
  4. If late data is arriving to the same windows, the same logic (as in section 3) is being applied. When a window is fired the data is accumulated from scratch, sent to a “sink” and purged from the window.
  5. we are not using the default trigger.
We expect the flow above to result in fragmented data, i.e. several outputs with the same <key, windows> which aggregate different sets of events.
 
We encounter the following problem:
Since we have a huge number of different <key, windows>, the metadata (WindowOperator, InternalTimer) is being kept in memory until the end of ‘allowed lateness’ period. This causes our job to run out of memory.
Here is a calculation of the required memory consumption only for the window metadata -
Metadata size for each <key, windows> is at least 64 bytes.
If we have 200,000,000 <key, windows> per day and the allowed lateness is set to 7 days:
200,000,000 * 64 * 7 = ~83GB
 
For the scenario above the window metadata is useless.
Is there a possibility to keep using window API, set allowed lateness and not keep the window metadata until the end of allowed lateness period?
(maybe as a new feature 😊?)


05.09.2019, 13:04, "Fabian Hueske" <[hidden email]>:
Hi,

A window needs to keep the data as long as it expects new data.
This is clearly the case before the end time of the window was reached. If my window ends at 12:30, I want to wait (at least) until 12:30 before I remove any data, right?

In case you expect some data to be late, you can configure allowedLateness.
Let's say, we configure allowedLateness of 10 minutes. In that case, Flink would keep the metadata of the window that closes at 12:30 until 12:40.
The data is kept to be able to update the result of the window until allowedLateness has passed.
If we for example receive a late record at 12:38, we can still update the result of the window because we kept all required data.

If you don't need allowedLateness, don't configure it (the default is 0).

Best, Fabian

Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl <[hidden email]>:
Hi,
 
I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.
  • What is the purpose of keeping this data if no new events are expected to enter the pane?
  • Is there any way this metadata can be released earlier?
Reply | Threaded
Open this post in threaded view
|

Re: Window metadata removal

Fabian Hueske-2
Hi,

Oh, now I understand your problem.
I dont' think that Flink is able to remove the metadata early. The implementation is designed for the general case which needs to support the case where the window data is not purged.
Something that might work is to not configure the window operator with allowed lateness (hence dropping all late records).
Instead you duplicate the stream before the window operator and have another operator (based on a ProcessFunction) that drops all "in-time" data and only forwards data that is at most 7 days old.

Alternatively, you can of course also scale out the program to more machines to add more memory.

Best,
Fabian

Am Mi., 18. Sept. 2019 um 08:39 Uhr schrieb gil bl <[hidden email]>:
Hi Fabian,  
Thank you for your reply.
 
I'm not sure my question was clear enough so I'll try to explain our scenario:
  1. We are working in “event time” mode.
  2. We want to handle ‘late data’ up to last X days (for example last 7 days)
  3. For each incoming event:
    1. The event is being aggregated using window function.
    2. When the window if “fired”, the accumulated data is forwarded to “sink” function and all data is being purged from the window.
  4. If late data is arriving to the same windows, the same logic (as in section 3) is being applied. When a window is fired the data is accumulated from scratch, sent to a “sink” and purged from the window.
  5. we are not using the default trigger.
We expect the flow above to result in fragmented data, i.e. several outputs with the same <key, windows> which aggregate different sets of events.
 
We encounter the following problem:
Since we have a huge number of different <key, windows>, the metadata (WindowOperator, InternalTimer) is being kept in memory until the end of ‘allowed lateness’ period. This causes our job to run out of memory.
Here is a calculation of the required memory consumption only for the window metadata -
Metadata size for each <key, windows> is at least 64 bytes.
If we have 200,000,000 <key, windows> per day and the allowed lateness is set to 7 days:
200,000,000 * 64 * 7 = ~83GB
 
For the scenario above the window metadata is useless.
Is there a possibility to keep using window API, set allowed lateness and not keep the window metadata until the end of allowed lateness period?
(maybe as a new feature 😊?)


05.09.2019, 13:04, "Fabian Hueske" <[hidden email]>:
Hi,

A window needs to keep the data as long as it expects new data.
This is clearly the case before the end time of the window was reached. If my window ends at 12:30, I want to wait (at least) until 12:30 before I remove any data, right?

In case you expect some data to be late, you can configure allowedLateness.
Let's say, we configure allowedLateness of 10 minutes. In that case, Flink would keep the metadata of the window that closes at 12:30 until 12:40.
The data is kept to be able to update the result of the window until allowedLateness has passed.
If we for example receive a late record at 12:38, we can still update the result of the window because we kept all required data.

If you don't need allowedLateness, don't configure it (the default is 0).

Best, Fabian

Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl <[hidden email]>:
Hi,
 
I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.
  • What is the purpose of keeping this data if no new events are expected to enter the pane?
  • Is there any way this metadata can be released earlier?