Hi all,
We are running Flink 1.2.0. Our flink-conf.yaml is configured to use a default restart-strategy of fixed-delay, with 3 attempts: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 These settings are echoed by the GlobalConfiguration (see first set of log statements). However, the job is submitted with a maxNumberRestartAttempts of Max INT instead of 3 (see second set of log statements) The job is enabled for checkpointing, and it does not have any job-specific restart strategy defined: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(params.getLong("checkpoint.interval", 3000L)); I assumed the default restart configuration would carry over to the job. Am I mistaken in my assumption, do I have a configuration error, or is this a bug? -- Vera 2017-06-27 19:52:11.288 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: restart-strategy, fixed-delay 2017-06-27 19:52:11.288 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: restart-strategy.fixed-delay.attempts, 3 2017-06-27 19:52:17.642 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.yarn.YarnJobManager - Submitting job XYZ 2017-06-27 19:52:17.652 [flink-akka.actor.default-dispatcher-16] INFO org.apache.flink.yarn.YarnJobManager - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=10000) for XYZ |
Hi Vera, Apparently, if there no job-specific restart strategy, an infinite FixedDelayRestartStrategy is always used for the job submission: IMO, this seems to be a bug, as the global restart strategy config should be respected. I’ll get back to this once I confirm this. Regards, Gordon On 28 June 2017 at 10:22:37 PM, Vera Coberley ([hidden email]) wrote:
|
Hey Vera and Gordon,
I agree that this behaviour is confusing. If we want to split hairs here, we wouldn't call it a bug, because the restart strategy docs say that "Default restart strategy to use in case no restart strategy has been specified for the job". The confusing part is that enabling checkpoints sets a restart strategy that overwrites the default configuration. We would need to specify the restart strategy on the job manager, because the client who runs the job graph generator doesn't have access to the cluster config. If we change this, we have to think about how to do it without breaking behaviour of existing programs. @Vera: As a work around you could enable checkpointing and afterwards explicitly disable restarts via ExecutionConfig.setRestartStrategy(null). Then the cluster default should be picked up. – Ufuk On Thu, Jun 29, 2017 at 8:37 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: > Hi Vera, > > Apparently, if there no job-specific restart strategy, an infinite > FixedDelayRestartStrategy is always used for the job submission: > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L571-L576 > > IMO, this seems to be a bug, as the global restart strategy config should be > respected. I’ll get back to this once I confirm this. > > Regards, > Gordon > > On 28 June 2017 at 10:22:37 PM, Vera Coberley ([hidden email]) > wrote: > > Hi all, > > We are running Flink 1.2.0. Our flink-conf.yaml is configured to use a > default restart-strategy of fixed-delay, with 3 attempts: > > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 3 > > These settings are echoed by the GlobalConfiguration (see first set of log > statements). However, the job is submitted with a maxNumberRestartAttempts > of Max INT instead of 3 (see second set of log statements) > > The job is enabled for checkpointing, and it does not have any job-specific > restart strategy defined: > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(params.getLong("checkpoint.interval", 3000L)); > > I assumed the default restart configuration would carry over to the job. Am > I mistaken in my assumption, do I have a configuration error, or is this a > bug? > > -- Vera > > 2017-06-27 19:52:11.288 [main] INFO > org.apache.flink.configuration.GlobalConfiguration - Loading configuration > property: restart-strategy, fixed-delay > 2017-06-27 19:52:11.288 [main] INFO > org.apache.flink.configuration.GlobalConfiguration - Loading configuration > property: restart-strategy.fixed-delay.attempts, 3 > > 2017-06-27 19:52:17.642 [flink-akka.actor.default-dispatcher-16] INFO > org.apache.flink.yarn.YarnJobManager - Submitting job XYZ > 2017-06-27 19:52:17.652 [flink-akka.actor.default-dispatcher-16] INFO > org.apache.flink.yarn.YarnJobManager - Using restart strategy > FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, > delayBetweenRestartAttempts=10000) for XYZ |
On Thu, Jun 29, 2017 at 8:59 AM, Ufuk Celebi <[hidden email]> wrote:
> @Vera: As a work around you could enable checkpointing and afterwards > explicitly disable restarts via > ExecutionConfig.setRestartStrategy(null). Then the cluster default > should be picked up. @Vera: Sorry, just checked Gordon's code reference and saw that my suggestion wouldn't work there. So I don't think that there is a good work around here... |
Free forum by Nabble | Edit this page |