My flink job loads several configuration files that contain job, operator and
business configuration. One of the operators is an AsyncOperator with function like so: class AsyncFun(config: T) extends RichAsyncFunction[X, Y] { @transient private lazy val client = f(config, metricGroup, etc.) @transient private lazy val metricGroup = ... def asyncInvoke(....) } The variables are declared lazily as an alternative to implementing the open method. This is un-avoidable as we're relying on flink's monitoring libraries. Application resumes from checkpoint upon unexpected termination. However, sometimes I want to change the parameter config that's passed as a constructor argument but it doesn't work as Flink tries to restore from the submittedJobGraph. This makes sense as Flink by itself doesn't know whether its recovering from an abrupt termination and must therefore rely on old config to build client or whether to start afresh. I want to know what options do we have to allow for configuration changes (i.e. re-initializing the operators): 1. Is there any way to restore from a checkpoint as well as recreate client using newer configuration? 2. If we take a savepoint (drain and save) and then resume the job, then will the configuration changes happen? 3. Will we have to move away from flink monitoring so as to initialize the client inside the constructor? 4. One option is to remove the constructor argument entirely and load config inside the open method. I want to know how this can be done without exposing the entire application configuration. I could store the configuration inside job parameters (by somehow converting this object to a map which I don't want to) but how to load it back as this operator function is used by multiple operators? 5. Any other option? For functions that aren't AsyncFunction, is leveraging BroadcastState the only way to dynamically update configuration? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Will this work - In main method, serialize config into a string and store it
using ParameterTool with key as taskName and value as config (serialized as string). Then in the open method, lookup the relevant configuration using getTaskName(). A follow up to this would be configuring custom windowing functions. I have a size as well as a time based window class where size and time limits are configurable and passed as constructor arguments. How to change this configuration when state persistence/recovery is enabled? A window doesn't have an open method per se -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I gave some answers in the other mail thread. Some additional comment: In general I think even configuration can be considered as state in this case. If state is not set, the job can be considered as a fresh start. Once the state is set, it would basically be just a configuration update. You could set timers to periodically query either an external service or just a distributed file system for an updated version of the configuration. If you want to avoid excessive state access, you could also cache the state's content in a transient variable. Regards, Timo On 16.12.20 21:50, vishalovercome wrote: > Will this work - In main method, serialize config into a string and store it > using ParameterTool with key as taskName and value as config (serialized as > string). Then in the open method, lookup the relevant configuration using > getTaskName(). > > A follow up to this would be configuring custom windowing functions. I have > a size as well as a time based window class where size and time limits are > configurable and passed as constructor arguments. How to change this > configuration when state persistence/recovery is enabled? A window doesn't > have an open method per se > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Free forum by Nabble | Edit this page |