Hello, Looking at existing restart strategies they are kind of generic. We have a requirement to restart the job only in case of specific exception/issues. What would be the best way to have a re start strategy which is based on few rules like looking at particular type of exception or some extra condition checks which are application specific.? Just a background on one specific issue which invoked this requirement is slots not getting released when the job finishes. In our applications, we keep track of jobs submitted with the amount of parallelism
allotted to it. Once the job finishes we assume that the slots are free and try to submit next set of jobs which at times fail with error “not enough slots available”. So we think a job re start can solve this issue but we only want to re start only if this particular situation is encountered. Please let us know If there are better ways to solve this problem other than re start strategy.
Thanks, Kasif Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices |
Hi Kasif, I think in this situation it is best if you defined your own custom RestartStrategy by specifying a class which has a `RestartStrategyFactory createFactory(Configuration configuration)` method as `restart-strategy: MyRestartStrategyFactoryFactory` in `flink-conf.yaml`. Cheers, Till On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif <[hidden email]> wrote:
|
Hi Til,
Sorry to resurface an ancient question, but is there a working example anywhere of setting a custom restart strategy? Asking because I’ve been wandering through the Flink 1.9 code base for a while, and the restart strategy implementation is…pretty tangled. From what I’ve been able to figure out, you have to provide a factory class, something like this: Configuration config = new Configuration(); config.setString(ConfigConstants.RESTART_STRATEGY, MyRestartStrategyFactory.class.getCanonicalName()); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4, config); That factory class should extend RestartStrategyFactory, but it also needs to implement a static method that looks like: public static MyRestartStrategyFactory createFactory(Configuration config) { return new MyRestartStrategyFactory(); } I wasn’t able to find any documentation that mentioned this particular method being a requirement. And also the documentation at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance doesn’t mention you can set a custom class name for the restart-strategy. Thanks, — Ken
-------------------------- Ken Krugler custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
In reply to this post by Till Rohrmann
Hi Til,
Sorry, missed the key question…in the RestartStrategy.restart() method, I don’t see any good way to get at the underlying exception. I can cast the RestartCallback to an ExecutionGraphRestartCallback, but I still need access to the private execGraph to be able to get at the failure info. Is there some other way in the restart handler to get at this? And yes, I meant to note you’d mentioned the required static method in your email, I was asking about documentation for it. Thanks, — Ken =============================================================== Sorry to resurface an ancient question, but is there a working example anywhere of setting a custom restart strategy? Asking because I’ve been wandering through the Flink 1.9 code base for a while, and the restart strategy implementation is…pretty tangled. From what I’ve been able to figure out, you have to provide a factory class, something like this: Configuration config = new Configuration(); config.setString(ConfigConstants.RESTART_STRATEGY, MyRestartStrategyFactory.class.getCanonicalName()); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4, config); That factory class should extend RestartStrategyFactory, but it also needs to implement a static method that looks like: public static MyRestartStrategyFactory createFactory(Configuration config) { return new MyRestartStrategyFactory(); } I wasn’t able to find any documentation that mentioned this particular method being a requirement. And also the documentation at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance doesn’t mention you can set a custom class name for the restart-strategy. Thanks, — Ken
-------------------------- Ken Krugler custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
Hi Ken, Custom restart-strategy was an experimental feature and was deprecated since 1.10. [1] That's why you cannot find any documentation for it. The old RestartStrategy was deprecated and replaced by RestartBackoffTimeStrategy since 1.10 (unless you are using the legacy scheduler which was also deprecated). The new restart strategy, RestartBackoffTimeStrategy, will be able to know the exact failure cause. However, the new restart strategy does not support customization at the moment. Your requirement sounds reasonable to me and I think custom (new) restart strategy can be something to support later. Thanks, Zhu Zhu Ken Krugler <[hidden email]> 于2020年5月13日周三 上午7:34写道:
|
Yes, you are right Zhu Zhu. Extending the RestartBackoffTimeStrategyFactoryLoader to also load custom RestartBackoffTimeStrategies sound like a good improvement for the future. [hidden email], the old RestartStrategy interface did not provide the cause of the failure, unfortunately. Cheers, Till On Wed, May 13, 2020 at 7:55 AM Zhu Zhu <[hidden email]> wrote:
|
Ticket FLINK-17714 is created to track this requirement. Thanks, Zhu Zhu Till Rohrmann <[hidden email]> 于2020年5月13日周三 下午8:30写道:
|
Free forum by Nabble | Edit this page |