Large rocksdb state restore/checkpoint duration behavior

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Large rocksdb state restore/checkpoint duration behavior

Aminouvic
Hi,

We are using Flink 1.6.1 on yarn with rocksdb as backend incrementally
checkpointed to hdfs (for data and timers).

The job reads events from kafka (~1 billion event per day), constructs user
sessions using an EventTimeSessionWindow coupled with a late firing trigger
and WindowFunction with AggregatingState (few minutes gap, 1 day allowed
lateness, ~1TB state ) to produces results back into kafka (~200 millions
event per day).

When trying to restart the job for maintenance (stopped the cluster for 1
hour), the restore duration took several hours.

Task metrics showed that no new data was read from Kafka, but the job
produced data out.

Also, sometimes, the job seems to freeze (no data in/out) while performing
long checkpoints (~20 minutes)

When we try to cancel the job it takes several minutes before stopping and
logs show the following :
:
2018-10-09 11:53:53,348 WARN  org.apache.flink.runtime.taskmanager.Task                    
- Task 'window-function -> (Sink: kafka-sink, Sink: kafka-late-sink) (1/60)'
did not react to cancelling signal for 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.get(Native Method)
org.rocksdb.RocksDB.get(RocksDB.java:810)
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:745)

Any ideas on this ?

Regards,
Amine




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Large rocksdb state restore/checkpoint duration behavior

Stefan Richter
Hi,

I would assume that the problem about blocked processing during a checkpoint is caused by [1], because you mentioned the use of RocksDB incremental checkpoints and it could be that you use it in combination with heap-based timers. This is the one combination that currently still uses a synchronous checkpointing path for the timers, and if you have many timers, this can block the pipeline.

For the cancellation problem, as seen in the stack trace, I would assume it is because of [2]. In a nutshell: if the wall clock or event time changes, multiple timers can trigger (it can be a lot, also depending on how big the change is) and currently this loop does not check the task’s cancellation status and will only terminate when all onTimer calls have been handled.

If you have problems with slow save points, you can also try to restore from the externalised handle of an incremental checkpoint and see if this works better.

Best,
Stefan

[2] https://issues.apache.org/jira/browse/FLINK-9845

On 10. Oct 2018, at 12:39, Aminouvic <[hidden email]> wrote:

Hi,

We are using Flink 1.6.1 on yarn with rocksdb as backend incrementally
checkpointed to hdfs (for data and timers).

The job reads events from kafka (~1 billion event per day), constructs user
sessions using an EventTimeSessionWindow coupled with a late firing trigger
and WindowFunction with AggregatingState (few minutes gap, 1 day allowed
lateness, ~1TB state ) to produces results back into kafka (~200 millions
event per day).

When trying to restart the job for maintenance (stopped the cluster for 1
hour), the restore duration took several hours.

Task metrics showed that no new data was read from Kafka, but the job
produced data out.

Also, sometimes, the job seems to freeze (no data in/out) while performing
long checkpoints (~20 minutes)

When we try to cancel the job it takes several minutes before stopping and
logs show the following :
:
2018-10-09 11:53:53,348 WARN  org.apache.flink.runtime.taskmanager.Task                    
- Task 'window-function -> (Sink: kafka-sink, Sink: kafka-late-sink) (1/60)'
did not react to cancelling signal for 30 seconds, but is stuck in method:
org.rocksdb.RocksDB.get(Native Method)
org.rocksdb.RocksDB.get(RocksDB.java:810)
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:745)

Any ideas on this ?

Regards,
Amine




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Large rocksdb state restore/checkpoint duration behavior

Aminouvic
Hello,

Thank you for your answer and apologies for the late response.

For timers we are using :

state.backend.rocksdb.timer-service.factory: rocksdb

Are we still affected by [1] ?

For the interruptibility, we have coalesced our timers and the application
became more responsive to stop signals.

Also, after long investigations, we've found that we were abusing/misusing
AggregatingState & RocksDB :-/

The pseudo code of our window function looked like the following :
/    ........
    private AggregatingState<*IN*,OUT> state;
    .......
    @Override
    public void apply(Tuple4<String,String,String,String> key, TimeWindow
window, Iterable<IN> input, Collector<OUT> out) throws Exception {
        Iterator<IN> it= input.iterator();
        while (it.hasNext()){
            *state.add(it.next());*
        }
        out.collect(state.get());
    }
    ......../

Doing so, leads to call getInternal/updateInternal in state.add on RocksDB
for each inputItem and causes a huge pressure on RocksDB.

We transformed the code in order to iterate over items in the
AggregagtingFunction instead and call the state.add only once:

/    ........
    private AggregatingState<*Iterable<IN>*,OUT> state;
    .......
    @Override
    public void apply(Tuple4<String,String,String,String> key, TimeWindow
window, Iterable<IN> input, Collector<OUT> out) throws Exception {
       * state.add(input);*
        out.collect(state.get());
    }
    ........
/
Is this the right way to do so ?

Since this modifications, the application is more stable and the recover
time falls to few minutes.

Thank you for your help.

Best regards,
Amine



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/