Hi, I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.
|
Hi, A window needs to keep the data as long as it expects new data. This is clearly the case before the end time of the window was reached. If my window ends at 12:30, I want to wait (at least) until 12:30 before I remove any data, right? In case you expect some data to be late, you can configure allowedLateness. Let's say, we configure allowedLateness of 10 minutes. In that case, Flink would keep the metadata of the window that closes at 12:30 until 12:40. The data is kept to be able to update the result of the window until allowedLateness has passed. If we for example receive a late record at 12:38, we can still update the result of the window because we kept all required data. If you don't need allowedLateness, don't configure it (the default is 0). Best, Fabian Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl <[hidden email]>:
|
Hi Fabian, Thank you for your reply. I'm not sure my question was clear enough so I'll try to explain our scenario:
We expect the flow above to result in fragmented data, i.e. several outputs with the same <key, windows> which aggregate different sets of events. We encounter the following problem: Since we have a huge number of different <key, windows>, the metadata (WindowOperator, InternalTimer) is being kept in memory until the end of ‘allowed lateness’ period. This causes our job to run out of memory. Here is a calculation of the required memory consumption only for the window metadata - Metadata size for each <key, windows> is at least 64 bytes. If we have 200,000,000 <key, windows> per day and the allowed lateness is set to 7 days: 200,000,000 * 64 * 7 = ~83GB For the scenario above the window metadata is useless. Is there a possibility to keep using window API, set allowed lateness and not keep the window metadata until the end of allowed lateness period? (maybe as a new feature 😊?) 05.09.2019, 13:04, "Fabian Hueske" <[hidden email]>:
|
Hi, Oh, now I understand your problem. I dont' think that Flink is able to remove the metadata early. The implementation is designed for the general case which needs to support the case where the window data is not purged. Something that might work is to not configure the window operator with allowed lateness (hence dropping all late records). Instead you duplicate the stream before the window operator and have another operator (based on a ProcessFunction) that drops all "in-time" data and only forwards data that is at most 7 days old. Alternatively, you can of course also scale out the program to more machines to add more memory. Best, Fabian Am Mi., 18. Sept. 2019 um 08:39 Uhr schrieb gil bl <[hidden email]>:
|
Free forum by Nabble | Edit this page |