Hi Yun,
Yes I see below mentioned log:
@StateId("map1")
private final
StateSpec<MapState<String,
HashMap<String,
Object>>>
map1
=
StateSpecs.map();
@StateId("map2")
private final
StateSpec<MapState<String,
HashMap<String,
HashMap<String,
Object >>>>
map2
=
StateSpecs.map();
@StateId("is_state_expiry_timer_set")
private final
StateSpec<ValueState<Boolean>>
isStateExpiryTimerSet
=
StateSpecs.value();
@TimerId("state_expiry")
private final TimerSpec stateExpiry = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
The state size keeps on growing exponentially, as we have set checkpoint time to 10mins.
If I am missing something, can you share some example to setup TTL, I believe the logic to clean expired records seems correct, just want to know if I am missing any addition components.
Thanks,
Julius
From: Yun Tang <[hidden email]>
Date: Sunday, March 28, 2021 at 3:49 AM
To: "Almeida, Julius" <[hidden email]>, user <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9
This email is from an external sender.
Hi Julius
You could check whether this log "Successfully loaded RocksDB native library"
[1] printed on task managers to check whether you actually let RocksDB state backend take effect. If using RocksDB state-backend, Flink would not use heap keyed state-backend anymore, in which `CopyOnWriteStateMap` should not exist.
BTW, you actually provided too few information. How many states do you use, how is the checkpointed size growing and have you ever set up TTL? It's not easy to help answer a question without
any details.
Best
Yun Tang
From: Almeida, Julius <[hidden email]>
Sent: Saturday, March 27, 2021 0:40
To: Yun Tang <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9
Hi Yun,
Thanks for response.
I am using beam with flinkv1.9 runner. ` CopyOnWriteStateMap` shows up in heap dump which say duplicate values, I am still using rocksdb state backend.
We can move towards slack for better communication if required any more details. Appreciate your help.🙂
Thanks,
Julius
From:
Yun Tang <[hidden email]>
Date: Friday, March 26, 2021 at 6:04 AM
To: Chesnay Schepler <[hidden email]>, "Almeida, Julius" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9
This email is from an external sender.
Hi,
If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?
CopyOnWriteStateMap should only exist in heap based state-backend.
Best
Yun Tang
From: Chesnay Schepler <[hidden email]>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9
Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?
On 3/25/2021 1:13 AM, Almeida, Julius wrote:
Hey,
Hope you all are doing well!
I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.
I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.
I also have logic added to remove expired events form the MapState
Eg.: MapState.remove(key)
Can anyone give me pointers to find more details on it.
Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811
Thanks,
Julius
Free forum by Nabble | Edit this page |