Hi all,
I'm using ProcessWindowFunction in a keyed stream with the following definition: final SingleOutputStreamOperator<Message> processWindowFunctionStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100))) .process(new CustomProcessWindowFunction()) .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID) .name("Process window function"); My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode. In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so. Testing conditions:
Testing scenario:
What I checked:
Tested with:
On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys With best regards
Oliwer
This message (including any attachments) may contain confidential, proprietary, privileged and/or private information. The information is intended for the use of the individual or entity designated above. If you are not the intended recipient of this message, please notify the sender immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or entity other than the intended recipient is STRICTLY PROHIBITED. Please note that ADB protects your privacy. Any personal information we collect from you is used in accordance with our Privacy Policy and in compliance with applicable European data protection law (Regulation (EU) 2016/679, General Data Protection Regulation) and other statutory provisions. |
Hi Oliwer, From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared. Best, Congxian Oliwer Kostera <[hidden email]> 于2019年9月27日周五 下午4:14写道:
|
Hi, I'm no sure what you mean by windowState.clear(). As far as I understand you correctly it's a windowState from ProcessWindowFunction Context which is KeyedStateStore. KeyedStateStore is managing registered keyed states that I don't have, so without a descriptor I can't access any clear() method. There is no state that I manage explicitly as you can see here: https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java With best regards Oliwer On 01.10.2019 07:48, Congxian Qiu wrote:
|
Hi Oliwer, I think you are right. There seems to be something going wrong. Just to clarify, you are sure that the growing state size is caused by the window operator? From your description I assume that the state size does not depend (solely) on the number of distinct keys. Otherwise, the state size would stop growing at some point. This would be a hint that every window leaves some state behind. AFAIK, processing time session windows are not very common. There might be a bug in the implementation. Could you create a Jira with a description of the problem? It would be great, if you could provide a reproducible example with a data generator source. Thank you, Fabian Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera <[hidden email]>:
|
Hi, I actually created Jira issue before posting it to mailing list. Today I added steps to reproduce with tests outcome of different scenarios to the repository. Jira issue: https://issues.apache.org/jira/browse/FLINK-14197 Repository: https://github.com/loliver1234/flink-process-window-function With best regards Oliwer On 02.10.2019 12:05, Fabian Hueske wrote:
|
Free forum by Nabble | Edit this page |