Extracting state keys for a very large RocksDB savepoint

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Extracting state keys for a very large RocksDB savepoint

Andrey Bulgakov
Hi all,

I'm trying to use the State Processor API to extract all keys from a RocksDB savepoint produced by an operator in a Flink streaming job into CSV files.

The problem is that the storage size of the savepoint is 30TB and I'm running into garbage collection issues no matter how much memory in different proportions or CPU cores I allocate to task managers. (I tried allocating up to 120GB and 16 cores to each task).

The same program and hardware configuration works with no problems for a smaller savepoint (300GB), it's some sort of a scalability issue here.

At the beginning the tasks spend a couple hours in what I call "the download phase". During that phase heap usage as indicated by metrics and Flink UI is at about 10% and everything is going great.

But at certain point heap usage for tasks coming out of the download phase starts to go up, climbs up to about 87% usage as indicated in Flink UI and by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap usage metric doesn't increase anymore and JVM starts spending a lot of time collecting garbage and keeping all CPUs 100% loaded. After some time in this mode the job crashes with "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1614821414188_0002_01_000035 timed out."

At all times the indicated managed memory usage is 0%. Which seems suspicious since RocksDB is supposed to be using it?

Also, judging by the lack of an application metric I have in the state processor operator, KeyedStateReaderFunction.readKey never gets called.

I would appreciate if somebody helped answering some of my questions or suggested a way I could further diagnose/fix this:

1. Is it normal that this overwhelming garbage collection starts long before reaching 100% heap usage? At the time it happens there 's usually 10-15GB of heap showing up as available.

2. Am I correct to assume that even in batch mode Flink implements memory back pressure and is supposed to slow down processing/allocations when it's low on available heap memory?

3. If #2 is true, is it possible that due to some misconfiguration Flink considers more heap space to be available than there actually is and keeps allocating even though there's no more heap?

4. As an alternative to #3, is it possible that there are some unaccounted heap allocations that are not shown in the UI and by the metric and therefore not taken into account by the memory back pressure mechanism?

Here's the minimal code example that demonstrates the issue:

I'm running this on Flink 12.2 (and many earlier versions, too) with the following base configuration and parallelism of 80 (tried lowering that to have more resources available, too):

I tried many things with no success:
- reducing parallelism and making more resources available to each task manager
- enabling object reuse and modifying the tuple mapper to avoid extra tuple allocations
- manipulating memory ratios to allocate more memory to be used as heap, managed
- allocating 20% of memory for JVM overhead
- switching to G1GC garbage collector

Again, would appreciate any help with this.

--
With regards,
Andrey Bulgakov
Reply | Threaded
Open this post in threaded view
|

Re: Extracting state keys for a very large RocksDB savepoint

Andrey Bulgakov
If anyone is interested, I reliazed that State Processor API was not the right tool for this since it spends a lot of time rebuilding RocksDB tables and then a lot of memory trying to read from it. All I really needed was operator keys.

So I used SavepointLoader.loadSavepointMetadata to get KeyGroupsStateHandle objects and built an InputFormat heavily based on the code I found in RocksDBFullRestoreOperation.java.

It ended up working extremely quickly while keeping memory and CPU usage at the minimum.

On Tue, Mar 9, 2021 at 1:51 PM Andrey Bulgakov <[hidden email]> wrote:
Hi all,

I'm trying to use the State Processor API to extract all keys from a RocksDB savepoint produced by an operator in a Flink streaming job into CSV files.

The problem is that the storage size of the savepoint is 30TB and I'm running into garbage collection issues no matter how much memory in different proportions or CPU cores I allocate to task managers. (I tried allocating up to 120GB and 16 cores to each task).

The same program and hardware configuration works with no problems for a smaller savepoint (300GB), it's some sort of a scalability issue here.

At the beginning the tasks spend a couple hours in what I call "the download phase". During that phase heap usage as indicated by metrics and Flink UI is at about 10% and everything is going great.

But at certain point heap usage for tasks coming out of the download phase starts to go up, climbs up to about 87% usage as indicated in Flink UI and by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap usage metric doesn't increase anymore and JVM starts spending a lot of time collecting garbage and keeping all CPUs 100% loaded. After some time in this mode the job crashes with "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1614821414188_0002_01_000035 timed out."

At all times the indicated managed memory usage is 0%. Which seems suspicious since RocksDB is supposed to be using it?

Also, judging by the lack of an application metric I have in the state processor operator, KeyedStateReaderFunction.readKey never gets called.

I would appreciate if somebody helped answering some of my questions or suggested a way I could further diagnose/fix this:

1. Is it normal that this overwhelming garbage collection starts long before reaching 100% heap usage? At the time it happens there 's usually 10-15GB of heap showing up as available.

2. Am I correct to assume that even in batch mode Flink implements memory back pressure and is supposed to slow down processing/allocations when it's low on available heap memory?

3. If #2 is true, is it possible that due to some misconfiguration Flink considers more heap space to be available than there actually is and keeps allocating even though there's no more heap?

4. As an alternative to #3, is it possible that there are some unaccounted heap allocations that are not shown in the UI and by the metric and therefore not taken into account by the memory back pressure mechanism?

Here's the minimal code example that demonstrates the issue:

I'm running this on Flink 12.2 (and many earlier versions, too) with the following base configuration and parallelism of 80 (tried lowering that to have more resources available, too):

I tried many things with no success:
- reducing parallelism and making more resources available to each task manager
- enabling object reuse and modifying the tuple mapper to avoid extra tuple allocations
- manipulating memory ratios to allocate more memory to be used as heap, managed
- allocating 20% of memory for JVM overhead
- switching to G1GC garbage collector

Again, would appreciate any help with this.

--
With regards,
Andrey Bulgakov


--
With regards,
Andrey Bulgakov
Reply | Threaded
Open this post in threaded view
|

Re: Extracting state keys for a very large RocksDB savepoint

Tzu-Li (Gordon) Tai
Hi Andrey,

Perhaps the functionality you described is worth adding to the State
Processor API.
Your observation on how the library currently works is correct; basically it
tries to restore the state backends as is.

In you current implementation, do you see it worthwhile to try to add this?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Extracting state keys for a very large RocksDB savepoint

Andrey Bulgakov
Hi Gordon,

I think my current implementation is very specific and wouldn't be that valuable for the broader public.
But I think there's a potential version of it that could also retrieve values from a savepoint in the same efficient way and that would be something that other people might need.

I'm currently thinking about something similar to KeyedProcessFunction but taking a single state descriptor as a parameter instead of expecting a user to "register" some of them open(). The processElement() method would then be invoked with both key and value.

One thing I'm not sure about are MapStateDescriptors because it stores compound keys separately and I'm not sure if they are stored in a sorted order and can be passed to processElement() as a group or should rather be passed separately.

I'll experiment with this for a while and try to figure out what works. Please let me know if you have thoughts about this.

On Sun, Mar 14, 2021 at 11:55 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Andrey,

Perhaps the functionality you described is worth adding to the State
Processor API.
Your observation on how the library currently works is correct; basically it
tries to restore the state backends as is.

In you current implementation, do you see it worthwhile to try to add this?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
With regards,
Andrey Bulgakov
Reply | Threaded
Open this post in threaded view
|

Re: Extracting state keys for a very large RocksDB savepoint

Andrey Bulgakov
I guess there's no point in making it a KeyedProcessFunction since it's not going to have access to context, timers or anything like that. So it can be a simple InputFormat returning a DataSet of key and value tuples.

On Wed, Mar 17, 2021 at 8:37 AM Andrey Bulgakov <[hidden email]> wrote:
Hi Gordon,

I think my current implementation is very specific and wouldn't be that valuable for the broader public.
But I think there's a potential version of it that could also retrieve values from a savepoint in the same efficient way and that would be something that other people might need.

I'm currently thinking about something similar to KeyedProcessFunction but taking a single state descriptor as a parameter instead of expecting a user to "register" some of them open(). The processElement() method would then be invoked with both key and value.

One thing I'm not sure about are MapStateDescriptors because it stores compound keys separately and I'm not sure if they are stored in a sorted order and can be passed to processElement() as a group or should rather be passed separately.

I'll experiment with this for a while and try to figure out what works. Please let me know if you have thoughts about this.

On Sun, Mar 14, 2021 at 11:55 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Andrey,

Perhaps the functionality you described is worth adding to the State
Processor API.
Your observation on how the library currently works is correct; basically it
tries to restore the state backends as is.

In you current implementation, do you see it worthwhile to try to add this?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
With regards,
Andrey Bulgakov


--
With regards,
Andrey Bulgakov