Question about State TTL and Interval Join

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about State TTL and Interval Join

McBride, Chris

We currently have a flink 1.8 application deployed on Kinesis Data Analytics using the RocksDB State backend. Our application is joining across 3 different kinesis streams using an interval join. We noticed that our checkpoint sizes continue to increase over time, we eventually have OOM failures writing checkpoints and need to restart the application without restoring from a savepoint.

 

Does this kind of application require a state TTL on the join operator? I assumed since it was an interval join, events that fell outside of the lower timebound would automatically be expired from the state. Is that a correct assumption?

 

Thanks,

Chris

 

Reply | Threaded
Open this post in threaded view
|

Re: Question about State TTL and Interval Join

JING ZHANG
Hi Chris,
There is no need to state TTL if stateful operators only contain IntervalJoin. 
Please check the watermark of two input streams, does the watermark not advance for a long time?

Best regards,
JING ZHANG

McBride, Chris <[hidden email]> 于2021年6月5日周六 上午3:17写道:

We currently have a flink 1.8 application deployed on Kinesis Data Analytics using the RocksDB State backend. Our application is joining across 3 different kinesis streams using an interval join. We noticed that our checkpoint sizes continue to increase over time, we eventually have OOM failures writing checkpoints and need to restart the application without restoring from a savepoint.

 

Does this kind of application require a state TTL on the join operator? I assumed since it was an interval join, events that fell outside of the lower timebound would automatically be expired from the state. Is that a correct assumption?

 

Thanks,

Chris

 

Reply | Threaded
Open this post in threaded view
|

Re: Question about State TTL and Interval Join

Yun Tang
In reply to this post by McBride, Chris
Hi Chris,

Interval Join should clean state which is not joined during interval and you don't need to set state TTL. (Actually, the states used in interval join are not exposed out and you cannot set TTL for those state as TTL is only public for user self-described states.)

The checkpoint size continues to increase does not mean your actual state also increases. RocksDB actually write a deleter when remove element and those useless data would be cleared physically after compaction. You could judge whether state really grows up by using non-incremental checkpoints to see how much state size will be.

Moreover, the OOM should not be related to RocksDB as it used off-heap native memory, and you might need some work to dig what occupied the JVM memory during checkpoints.

Best
Yun Tang

From: McBride, Chris <[hidden email]>
Sent: Saturday, June 5, 2021 3:17
To: [hidden email] <[hidden email]>
Subject: Question about State TTL and Interval Join
 

We currently have a flink 1.8 application deployed on Kinesis Data Analytics using the RocksDB State backend. Our application is joining across 3 different kinesis streams using an interval join. We noticed that our checkpoint sizes continue to increase over time, we eventually have OOM failures writing checkpoints and need to restart the application without restoring from a savepoint.

 

Does this kind of application require a state TTL on the join operator? I assumed since it was an interval join, events that fell outside of the lower timebound would automatically be expired from the state. Is that a correct assumption?

 

Thanks,

Chris