State size increasing exponentially in Flink v1.9

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

State size increasing exponentially in Flink v1.9

Almeida, Julius

Hey,

Hope you all are doing well!

 

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.

 

I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.

 

I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)

 

Can anyone give me pointers to find more details on it.

 

Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

 

Thanks,

Julius

Reply | Threaded
Open this post in threaded view
|

Re: State size increasing exponentially in Flink v1.9

Chesnay Schepler
Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!

 

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.

 

I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.

 

I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)

 

Can anyone give me pointers to find more details on it.

 

Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

 

Thanks,

Julius


Reply | Threaded
Open this post in threaded view
|

Re: State size increasing exponentially in Flink v1.9

Yun Tang
Hi,

If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?

CopyOnWriteStateMap should only exist in heap based state-backend.

Best
Yun Tang


From: Chesnay Schepler <[hidden email]>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9
 
Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!

 

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.

 

I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.

 

I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)

 

Can anyone give me pointers to find more details on it.

 

Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

 

Thanks,

Julius


Reply | Threaded
Open this post in threaded view
|

Re: State size increasing exponentially in Flink v1.9

Yun Tang
Hi Julius

You could check whether this log "Successfully loaded RocksDB native library" [1] printed on task managers to check whether you actually let RocksDB state backend take effect. If using RocksDB state-backend, Flink would not use heap keyed state-backend anymore, in which `CopyOnWriteStateMap` should not exist.

BTW, you actually provided too few information. How many states do you use, how is the checkpointed size growing and have you ever set up TTL? It's not easy to help answer a question without any details.


Best
Yun Tang

From: Almeida, Julius <[hidden email]>
Sent: Saturday, March 27, 2021 0:40
To: Yun Tang <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9
 

Hi Yun,

 

Thanks for response.

 

I am using beam with flinkv1.9 runner. ` CopyOnWriteStateMap` shows up in heap dump which say duplicate values, I am still using rocksdb state backend.

 

Graphical user interface, text, application, Teams

Description automatically generated

Graphical user interface, text, application, Teams

Description automatically generated

 

We can move towards slack for better communication if required any more details. Appreciate your help.🙂

 

Thanks,

Julius

From: Yun Tang <[hidden email]>
Date: Friday, March 26, 2021 at 6:04 AM
To: Chesnay Schepler <[hidden email]>, "Almeida, Julius" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

This email is from an external sender.

 

Hi,

 

If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?

 

CopyOnWriteStateMap should only exist in heap based state-backend.

 

Best

Yun Tang

 


From: Chesnay Schepler <[hidden email]>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?

 

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!

 

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.

 

I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.

 

I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)

 

Can anyone give me pointers to find more details on it.

 

Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

 

Thanks,

Julius

 

Reply | Threaded
Open this post in threaded view
|

Re: State size increasing exponentially in Flink v1.9

Almeida, Julius

Hi Yun,

 

Yes I see below mentioned log:

 

 

@StateId("map1")
private final StateSpec<MapState<String, HashMap<String, Object>>> map1 = StateSpecs.map();

@StateId("map2")
private final StateSpec<MapState<String, HashMap<String, HashMap<String, Object >>>> map2 = StateSpecs.map();

@StateId("is_state_expiry_timer_set")
private final StateSpec<ValueState<Boolean>> isStateExpiryTimerSet = StateSpecs.value();

 

@TimerId("state_expiry")
private final TimerSpec stateExpiry = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

 

  1. I remove values from map which are expired, we check for expired values every 2hrs
  2. Later I do call isStateExpiryTimerSet.clear();

 

The state size keeps on growing exponentially, as we have set checkpoint time to 10mins.

If I am missing something, can you share some example to setup TTL, I believe the logic to clean expired records seems correct, just want to know if I am missing any addition components.

 

Thanks,

Julius

 

From: Yun Tang <[hidden email]>
Date: Sunday, March 28, 2021 at 3:49 AM
To: "Almeida, Julius" <[hidden email]>, user <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

This email is from an external sender.

 

Hi Julius

 

You could check whether this log "Successfully loaded RocksDB native library" [1] printed on task managers to check whether you actually let RocksDB state backend take effect. If using RocksDB state-backend, Flink would not use heap keyed state-backend anymore, in which `CopyOnWriteStateMap` should not exist.

 

BTW, you actually provided too few information. How many states do you use, how is the checkpointed size growing and have you ever set up TTL? It's not easy to help answer a question without any details.

 

 

Best

Yun Tang


From: Almeida, Julius <[hidden email]>
Sent: Saturday, March 27, 2021 0:40
To: Yun Tang <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

Hi Yun,

 

Thanks for response.

 

I am using beam with flinkv1.9 runner. ` CopyOnWriteStateMap` shows up in heap dump which say duplicate values, I am still using rocksdb state backend.

 

Graphical user interface, text, application, Teams

Description automatically generated

Graphical user interface, text, application, Teams

Description automatically generated

 

We can move towards slack for better communication if required any more details. Appreciate your help.🙂

 

Thanks,

Julius

From: Yun Tang <[hidden email]>
Date: Friday, March 26, 2021 at 6:04 AM
To: Chesnay Schepler <[hidden email]>, "Almeida, Julius" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

This email is from an external sender.

 

Hi,

 

If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?

 

CopyOnWriteStateMap should only exist in heap based state-backend.

 

Best

Yun Tang

 


From: Chesnay Schepler <[hidden email]>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?

 

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!

 

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.

 

I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.

 

I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)

 

Can anyone give me pointers to find more details on it.

 

Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

 

Thanks,

Julius

 

Reply | Threaded
Open this post in threaded view
|

Re: State size increasing exponentially in Flink v1.9

Yun Tang
Hi Julius,

It seems you have customizer wrapper like `StateSpec` and `MapState` which are not included in Flink runtime code. I cannot judge whether your usage is correct since I don't know what you have done in your framework.

If you really use TTL with RocksDB state backend, you could just set TTL configuration as official doc said [1][2] instead of your current strange check and remove usage. Cleaning in background should give much better performance.

BTW, if the `MapState` you used in code was just the Flink's MapState [3], the correct way should be MapState<String, Object> instead of the strange nested format of MapState<String, HashMap<String, Object>.

Moreover, it's better to give explicit type of map state value instead of just `Object`.



Best
Yun Tang

From: Almeida, Julius <[hidden email]>
Sent: Sunday, March 28, 2021 23:36
To: Yun Tang <[hidden email]>; user <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9
 

Hi Yun,

 

Yes I see below mentioned log:

 

 

@StateId("map1")
private final StateSpec<MapState<String, HashMap<String, Object>>> map1 = StateSpecs.map();

@StateId("map2")
private final StateSpec<MapState<String, HashMap<String, HashMap<String, Object >>>> map2 = StateSpecs.map();

@StateId("is_state_expiry_timer_set")
private final StateSpec<ValueState<Boolean>> isStateExpiryTimerSet = StateSpecs.value();

 

@TimerId("state_expiry")
private final TimerSpec stateExpiry = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

 

  1. I remove values from map which are expired, we check for expired values every 2hrs
  2. Later I do call isStateExpiryTimerSet.clear();

 

The state size keeps on growing exponentially, as we have set checkpoint time to 10mins.

If I am missing something, can you share some example to setup TTL, I believe the logic to clean expired records seems correct, just want to know if I am missing any addition components.

 

Thanks,

Julius

 

From: Yun Tang <[hidden email]>
Date: Sunday, March 28, 2021 at 3:49 AM
To: "Almeida, Julius" <[hidden email]>, user <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

This email is from an external sender.

 

Hi Julius

 

You could check whether this log "Successfully loaded RocksDB native library" [1] printed on task managers to check whether you actually let RocksDB state backend take effect. If using RocksDB state-backend, Flink would not use heap keyed state-backend anymore, in which `CopyOnWriteStateMap` should not exist.

 

BTW, you actually provided too few information. How many states do you use, how is the checkpointed size growing and have you ever set up TTL? It's not easy to help answer a question without any details.

 

 

Best

Yun Tang


From: Almeida, Julius <[hidden email]>
Sent: Saturday, March 27, 2021 0:40
To: Yun Tang <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

Hi Yun,

 

Thanks for response.

 

I am using beam with flinkv1.9 runner. ` CopyOnWriteStateMap` shows up in heap dump which say duplicate values, I am still using rocksdb state backend.

 

Graphical user interface, text, application, Teams

Description automatically generated

Graphical user interface, text, application, Teams

Description automatically generated

 

We can move towards slack for better communication if required any more details. Appreciate your help.🙂

 

Thanks,

Julius

From: Yun Tang <[hidden email]>
Date: Friday, March 26, 2021 at 6:04 AM
To: Chesnay Schepler <[hidden email]>, "Almeida, Julius" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

This email is from an external sender.

 

Hi,

 

If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?

 

CopyOnWriteStateMap should only exist in heap based state-backend.

 

Best

Yun Tang

 


From: Chesnay Schepler <[hidden email]>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: State size increasing exponentially in Flink v1.9

 

Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?

 

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!

 

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.

 

I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.

 

I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)

 

Can anyone give me pointers to find more details on it.

 

Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

 

Thanks,

Julius