We currently have a flink 1.8 application deployed on Kinesis Data Analytics using the RocksDB State backend. Our application is joining across 3 different kinesis streams using an interval join. We noticed that
our checkpoint sizes continue to increase over time, we eventually have OOM failures writing checkpoints and need to restart the application without restoring from a savepoint. Does this kind of application require a state TTL on the join operator? I assumed since it was an interval join, events that fell outside of the lower timebound would automatically be expired from the state. Is
that a correct assumption? Thanks, Chris |
Hi Chris,
There is no need to state TTL if stateful operators only contain IntervalJoin. Please check the watermark of two input streams, does the watermark not advance for a long time? Best regards, JING ZHANG McBride, Chris <[hidden email]> 于2021年6月5日周六 上午3:17写道:
|
In reply to this post by McBride, Chris
Hi Chris,
Interval Join should clean state which is not joined during interval and you don't need to set state TTL. (Actually, the states used in interval join are not exposed out and you cannot set TTL for those state as TTL is only public for user self-described states.)
The checkpoint size continues to increase does not mean your actual state also increases. RocksDB actually write a deleter when remove element and those useless data would be cleared physically after compaction. You could judge whether state really grows up
by using non-incremental checkpoints to see how much state size will be.
Moreover, the OOM should not be related to RocksDB as it used off-heap native memory, and you might need some work to dig what occupied the JVM memory during checkpoints.
Best
Yun Tang
From: McBride, Chris <[hidden email]>
Sent: Saturday, June 5, 2021 3:17 To: [hidden email] <[hidden email]> Subject: Question about State TTL and Interval Join We currently have a flink 1.8 application deployed on Kinesis Data Analytics using the RocksDB State backend. Our application is joining across 3 different kinesis streams using an interval join. We noticed that our checkpoint sizes continue to increase over time, we eventually have OOM failures writing checkpoints and need to restart the application without restoring from a savepoint.
Does this kind of application require a state TTL on the join operator? I assumed since it was an interval join, events that fell outside of the lower timebound would automatically be expired from the state. Is that a correct assumption?
Thanks, Chris
|
Free forum by Nabble | Edit this page |