External checkpoints not getting cleaned up/discarded - potentially causing high load

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

External checkpoints not getting cleaned up/discarded - potentially causing high load

Jared Stehler
We’re seeing our external checkpoints directory grow in an unbounded fashion… after upgrading to Flink 1.3.  We are using Flink-Mesos.

In 1.2 (HA standalone mode), we saw (correctly) that only the latest external checkpoint was being retained (i.e., respecting state.checkpoints.num-retained default of 1)

The Mesos-agent running the Job Manager ends up with a really high load and ends up getting unresponsive….  Interestingly enough, there is not much CPU or Memory pressure… so it is suggesting to us that its IO or Network bound.  But nothing jumps out at us (using iostat/netstat).  The biggest difference seems to be external checkpoints not getting cleaned up/discarded.  What might cause that?

ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
KiB Swap:        0 total,        0 used,        0 free.  7752480 cached Mem

We specify Mesos agent attributes to ensure that our Flink containers are allocated to only a subset of the Mesos slaves…   However, we do end up with the Flink JobManager container running on the same physical instance as multiple task manager containers. We are running 65 task managers with 2 slots each, and ~70 jobs currently on the cluster.

We use AWS EFS (https://aws.amazon.com/efs/) mounted on all Mesos boxes to store recovery, checkpoint, external checkpoint and save point directories.


        executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));

        CheckpointConfig config = executionEnvironment.getCheckpointConfig();
        config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));

        executionEnvironment.getConfig().setGlobalJobParameters(params);
        executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
        executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);

        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // fail the job if it restarts more than 3 times in 1 minute, with 10 second delay
        executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
                Time.minutes(2), Time.seconds(1)));

        executionEnvironment.getConfig().setLatencyTrackingInterval(30000);


Would appreciate any insights you might have on this. 

Thanks

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703




signature.asc (465 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

Stefan Richter
Hi,

I have two quick questions about this problem report:

1) Which state backend are you using?
2) In case you are using RocksDB, did you also activate incremental checkpointing when moving to Flink 1.3.

Another thing that could be really helpful, if possible, can you attach a profiler/sampling to your job manager and figure out the hotspot methods where most time is spend? This would be very helpful as a starting point where the problem is potentially caused.

Best,
Stefan

Am 29.06.2017 um 18:02 schrieb Jared Stehler <[hidden email]>:

We’re seeing our external checkpoints directory grow in an unbounded fashion… after upgrading to Flink 1.3.  We are using Flink-Mesos.

In 1.2 (HA standalone mode), we saw (correctly) that only the latest external checkpoint was being retained (i.e., respecting state.checkpoints.num-retained default of 1)

The Mesos-agent running the Job Manager ends up with a really high load and ends up getting unresponsive….  Interestingly enough, there is not much CPU or Memory pressure… so it is suggesting to us that its IO or Network bound.  But nothing jumps out at us (using iostat/netstat).  The biggest difference seems to be external checkpoints not getting cleaned up/discarded.  What might cause that?

ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
KiB Swap:        0 total,        0 used,        0 free.  7752480 cached Mem

We specify Mesos agent attributes to ensure that our Flink containers are allocated to only a subset of the Mesos slaves…   However, we do end up with the Flink JobManager container running on the same physical instance as multiple task manager containers. We are running 65 task managers with 2 slots each, and ~70 jobs currently on the cluster.

We use AWS EFS (https://aws.amazon.com/efs/) mounted on all Mesos boxes to store recovery, checkpoint, external checkpoint and save point directories.


        executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));

        CheckpointConfig config = executionEnvironment.getCheckpointConfig();
        config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));

        executionEnvironment.getConfig().setGlobalJobParameters(params);
        executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
        executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);

        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // fail the job if it restarts more than 3 times in 1 minute, with 10 second delay
        executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
                Time.minutes(2), Time.seconds(1)));

        executionEnvironment.getConfig().setLatencyTrackingInterval(30000);


Would appreciate any insights you might have on this. 

Thanks

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703




Reply | Threaded
Open this post in threaded view
|

Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

Ufuk Celebi
On Mon, Jul 3, 2017 at 12:02 PM, Stefan Richter
<[hidden email]> wrote:
> Another thing that could be really helpful, if possible, can you attach a
> profiler/sampling to your job manager and figure out the hotspot methods
> where most time is spend? This would be very helpful as a starting point
> where the problem is potentially caused.

A stack trace will also be helpful to see whether some threads are stuck.

If it is possible to run on Mesos without HA mode (@Till: is that
possible?), it might be worthwhile to re-run this without HA to get a
hint whether it is related to HA mode.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

pnayak
Hi Stefan and Ufuk

Thank you for your responses...

We are using RocksDB as our state backend.. When we upgraded to 1.3, we did not switch to incremental (we specify state.backend in flink-conf.yaml) and did not realize we had to init RocksDb programatically to use incremental checkpoint state update.. (in case there is a config option that I/we missed, please do let us know).

In digging into this issue further, we believe that it is AWS EFS that is the source of our problems.

EFS is essentially NFS under the covers...

EFS itself seems to be "ok" - in that it is not really choking.. however what seems to be hitting us (and Flink) is the behavior of NFS when lots of small files/updates are happening at once.. in this case the NFS mount appears to be unresponsive (https://docs.aws.amazon.com/efs/latest/ug/troubleshooting.html#mount-unresponsive)

Our current thinking is that this behavior is causing the JobManager to "appear" to be unresponsive as it  likely is trying to perform list operations on the files/sub-directories.

We confirmed this by restricting Flink to a single Mesos agent (with both Job and Task managers running on that one agent) and providing a local (EBS) mount for checkpoint/savepoint storage...

At this point, we're going back to using S3.  We've also had to explicitly set state.checkpoints.num-retained: 2

Note that we also observed that currently checkpoints and external-checkpoints for failed/cancelled jobs don't get cleaned up.. We're using a reaper script to do that for now.

Reply | Threaded
Open this post in threaded view
|

Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

Jared Stehler
In reply to this post by Stefan Richter
We are using the rocksDB state backend. We had not activated incremental checkpointing, but in the course of debugging this, we ended up doing so, and also moving back to S3 from EFS as it appeared that EFS was introducing large latencies. I will attempt to provide some profiler data as we are able to analyze further.

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



On Jul 3, 2017, at 6:02 AM, Stefan Richter <[hidden email]> wrote:

Hi,

I have two quick questions about this problem report:

1) Which state backend are you using?
2) In case you are using RocksDB, did you also activate incremental checkpointing when moving to Flink 1.3.

Another thing that could be really helpful, if possible, can you attach a profiler/sampling to your job manager and figure out the hotspot methods where most time is spend? This would be very helpful as a starting point where the problem is potentially caused.

Best,
Stefan

Am 29.06.2017 um 18:02 schrieb Jared Stehler <[hidden email]>:

We’re seeing our external checkpoints directory grow in an unbounded fashion… after upgrading to Flink 1.3.  We are using Flink-Mesos.

In 1.2 (HA standalone mode), we saw (correctly) that only the latest external checkpoint was being retained (i.e., respecting state.checkpoints.num-retained default of 1)

The Mesos-agent running the Job Manager ends up with a really high load and ends up getting unresponsive….  Interestingly enough, there is not much CPU or Memory pressure… so it is suggesting to us that its IO or Network bound.  But nothing jumps out at us (using iostat/netstat).  The biggest difference seems to be external checkpoints not getting cleaned up/discarded.  What might cause that?

ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
KiB Swap:        0 total,        0 used,        0 free.  7752480 cached Mem

We specify Mesos agent attributes to ensure that our Flink containers are allocated to only a subset of the Mesos slaves…   However, we do end up with the Flink JobManager container running on the same physical instance as multiple task manager containers. We are running 65 task managers with 2 slots each, and ~70 jobs currently on the cluster.

We use AWS EFS (https://aws.amazon.com/efs/) mounted on all Mesos boxes to store recovery, checkpoint, external checkpoint and save point directories.


        executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));

        CheckpointConfig config = executionEnvironment.getCheckpointConfig();
        config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));

        executionEnvironment.getConfig().setGlobalJobParameters(params);
        executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
        executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);

        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // fail the job if it restarts more than 3 times in 1 minute, with 10 second delay
        executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
                Time.minutes(2), Time.seconds(1)));

        executionEnvironment.getConfig().setLatencyTrackingInterval(30000);


Would appreciate any insights you might have on this. 

Thanks

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703






signature.asc (465 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

Stefan Richter
Hi Jared,

I just wanted to follow up on this problem that you reported. Are there any new insights about this problem from your debugging efforts and does it still exists for you?

Best,
Stefan

Am 09.07.2017 um 18:37 schrieb Jared Stehler <[hidden email]>:

We are using the rocksDB state backend. We had not activated incremental checkpointing, but in the course of debugging this, we ended up doing so, and also moving back to S3 from EFS as it appeared that EFS was introducing large latencies. I will attempt to provide some profiler data as we are able to analyze further.

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



On Jul 3, 2017, at 6:02 AM, Stefan Richter <[hidden email]> wrote:

Hi,

I have two quick questions about this problem report:

1) Which state backend are you using?
2) In case you are using RocksDB, did you also activate incremental checkpointing when moving to Flink 1.3.

Another thing that could be really helpful, if possible, can you attach a profiler/sampling to your job manager and figure out the hotspot methods where most time is spend? This would be very helpful as a starting point where the problem is potentially caused.

Best,
Stefan

Am 29.06.2017 um 18:02 schrieb Jared Stehler <[hidden email]>:

We’re seeing our external checkpoints directory grow in an unbounded fashion… after upgrading to Flink 1.3.  We are using Flink-Mesos.

In 1.2 (HA standalone mode), we saw (correctly) that only the latest external checkpoint was being retained (i.e., respecting state.checkpoints.num-retained default of 1)

The Mesos-agent running the Job Manager ends up with a really high load and ends up getting unresponsive….  Interestingly enough, there is not much CPU or Memory pressure… so it is suggesting to us that its IO or Network bound.  But nothing jumps out at us (using iostat/netstat).  The biggest difference seems to be external checkpoints not getting cleaned up/discarded.  What might cause that?

ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
KiB Swap:        0 total,        0 used,        0 free.  7752480 cached Mem

We specify Mesos agent attributes to ensure that our Flink containers are allocated to only a subset of the Mesos slaves…   However, we do end up with the Flink JobManager container running on the same physical instance as multiple task manager containers. We are running 65 task managers with 2 slots each, and ~70 jobs currently on the cluster.

We use AWS EFS (https://aws.amazon.com/efs/) mounted on all Mesos boxes to store recovery, checkpoint, external checkpoint and save point directories.


        executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));

        CheckpointConfig config = executionEnvironment.getCheckpointConfig();
        config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));

        executionEnvironment.getConfig().setGlobalJobParameters(params);
        executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
        executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);

        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // fail the job if it restarts more than 3 times in 1 minute, with 10 second delay
        executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
                Time.minutes(2), Time.seconds(1)));

        executionEnvironment.getConfig().setLatencyTrackingInterval(30000);


Would appreciate any insights you might have on this. 

Thanks

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703