Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

Vera Coberley
Hi all,

We are running Flink 1.2.0. Our flink-conf.yaml is configured to use a default restart-strategy of fixed-delay, with 3 attempts:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3

These settings are echoed by the GlobalConfiguration (see first set of log statements). However, the job is submitted with a maxNumberRestartAttempts of Max INT instead of 3 (see second set of log statements)

The job is enabled for checkpointing, and it does not have any job-specific restart strategy defined:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(params.getLong("checkpoint.interval", 3000L)); 

I assumed the default restart configuration would carry over to the job. Am I mistaken in my assumption, do I have a configuration error, or is this a bug?

-- Vera

2017-06-27 19:52:11.288 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: restart-strategy, fixed-delay
2017-06-27 19:52:11.288 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: restart-strategy.fixed-delay.attempts, 3
2017-06-27 19:52:17.642 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.yarn.YarnJobManager  - Submitting job XYZ
2017-06-27 19:52:17.652 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.yarn.YarnJobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=10000) for XYZ
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

Tzu-Li (Gordon) Tai
Hi Vera,

Apparently, if there no job-specific restart strategy, an infinite FixedDelayRestartStrategy is always used for the job submission:

IMO, this seems to be a bug, as the global restart strategy config should be respected. I’ll get back to this once I confirm this.

Regards,
Gordon

On 28 June 2017 at 10:22:37 PM, Vera Coberley ([hidden email]) wrote:

Hi all,

We are running Flink 1.2.0. Our flink-conf.yaml is configured to use a default restart-strategy of fixed-delay, with 3 attempts:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3

These settings are echoed by the GlobalConfiguration (see first set of log statements). However, the job is submitted with a maxNumberRestartAttempts of Max INT instead of 3 (see second set of log statements)

The job is enabled for checkpointing, and it does not have any job-specific restart strategy defined:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(params.getLong("checkpoint.interval", 3000L)); 

I assumed the default restart configuration would carry over to the job. Am I mistaken in my assumption, do I have a configuration error, or is this a bug?

-- Vera

2017-06-27 19:52:11.288 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: restart-strategy, fixed-delay
2017-06-27 19:52:11.288 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: restart-strategy.fixed-delay.attempts, 3

2017-06-27 19:52:17.642 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.yarn.YarnJobManager  - Submitting job XYZ
2017-06-27 19:52:17.652 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.yarn.YarnJobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=10000) for XYZ
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

Ufuk Celebi
Hey Vera and Gordon,

I agree that this behaviour is confusing.

If we want to split hairs here, we wouldn't call it a bug, because the
restart strategy docs say that "Default restart strategy to use in
case no restart strategy has been specified for the job". The
confusing part is that enabling checkpoints sets a restart strategy
that overwrites the default configuration.

We would need to specify the restart strategy on the job manager,
because the client who runs the job graph generator doesn't have
access to the cluster config. If we change this, we have to think
about how to do it without breaking behaviour of existing programs.

@Vera: As a work around you could enable checkpointing and afterwards
explicitly disable restarts via
ExecutionConfig.setRestartStrategy(null). Then the cluster default
should be picked up.

– Ufuk

On Thu, Jun 29, 2017 at 8:37 AM, Tzu-Li (Gordon) Tai
<[hidden email]> wrote:

> Hi Vera,
>
> Apparently, if there no job-specific restart strategy, an infinite
> FixedDelayRestartStrategy is always used for the job submission:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L571-L576
>
> IMO, this seems to be a bug, as the global restart strategy config should be
> respected. I’ll get back to this once I confirm this.
>
> Regards,
> Gordon
>
> On 28 June 2017 at 10:22:37 PM, Vera Coberley ([hidden email])
> wrote:
>
> Hi all,
>
> We are running Flink 1.2.0. Our flink-conf.yaml is configured to use a
> default restart-strategy of fixed-delay, with 3 attempts:
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 3
>
> These settings are echoed by the GlobalConfiguration (see first set of log
> statements). However, the job is submitted with a maxNumberRestartAttempts
> of Max INT instead of 3 (see second set of log statements)
>
> The job is enabled for checkpointing, and it does not have any job-specific
> restart strategy defined:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(params.getLong("checkpoint.interval", 3000L));
>
> I assumed the default restart configuration would carry over to the job. Am
> I mistaken in my assumption, do I have a configuration error, or is this a
> bug?
>
> -- Vera
>
> 2017-06-27 19:52:11.288 [main] INFO
> org.apache.flink.configuration.GlobalConfiguration  - Loading configuration
> property: restart-strategy, fixed-delay
> 2017-06-27 19:52:11.288 [main] INFO
> org.apache.flink.configuration.GlobalConfiguration  - Loading configuration
> property: restart-strategy.fixed-delay.attempts, 3
>
> 2017-06-27 19:52:17.642 [flink-akka.actor.default-dispatcher-16] INFO
> org.apache.flink.yarn.YarnJobManager  - Submitting job XYZ
> 2017-06-27 19:52:17.652 [flink-akka.actor.default-dispatcher-16] INFO
> org.apache.flink.yarn.YarnJobManager  - Using restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=10000) for XYZ
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

Ufuk Celebi
On Thu, Jun 29, 2017 at 8:59 AM, Ufuk Celebi <[hidden email]> wrote:
> @Vera: As a work around you could enable checkpointing and afterwards
> explicitly disable restarts via
> ExecutionConfig.setRestartStrategy(null). Then the cluster default
> should be picked up.

@Vera: Sorry, just checked Gordon's code reference and saw that my
suggestion wouldn't work there. So I don't think that there is a good
work around here...