Changing application configuration when restoring from checkpoint/savepoint

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Changing application configuration when restoring from checkpoint/savepoint

vishalovercome
My flink job loads several configuration files that contain job, operator and
business configuration. One of the operators is an AsyncOperator with
function like so:

class AsyncFun(config: T) extends RichAsyncFunction[X, Y] {
   @transient private lazy val client = f(config, metricGroup, etc.)
   @transient private lazy val metricGroup = ...
   
   def asyncInvoke(....)
}

The variables are declared lazily as an alternative to implementing the open
method. This is un-avoidable as we're relying on flink's monitoring
libraries. Application resumes from checkpoint upon unexpected termination.
However, sometimes I want to change the parameter config that's passed as a
constructor argument but it doesn't work as Flink tries to restore from the
submittedJobGraph. This makes sense as Flink by itself doesn't know whether
its recovering from an abrupt termination and must therefore rely on old
config to build client or whether to start afresh. I want to know what
options do we have to allow for configuration changes (i.e. re-initializing
the operators):

1. Is there any way to restore from a checkpoint as well as recreate client
using newer configuration?
2. If we take a savepoint (drain and save) and then resume the job, then
will the configuration changes happen?
3. Will we have to move away from flink monitoring so as to initialize the
client inside the constructor?
4. One option is to remove the constructor argument entirely and load config
inside the open method. I want to know how this can be done without exposing
the entire application configuration. I could store the configuration inside
job parameters (by somehow converting this object to a map which I don't
want to) but how to load it back as this operator function is used by
multiple operators?
5. Any other option?

For functions that aren't AsyncFunction, is leveraging BroadcastState the
only way to dynamically update configuration?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Changing application configuration when restoring from checkpoint/savepoint

vishalovercome
Will this work - In main method, serialize config into a string and store it
using ParameterTool with key as taskName and value as config (serialized as
string). Then in the open method, lookup the relevant configuration using
getTaskName().

A follow up to this would be configuring custom windowing functions. I have
a size as well as a time based window class where size and time limits are
configurable and passed as constructor arguments. How to change this
configuration when state persistence/recovery is enabled? A window doesn't
have an open method per se





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Changing application configuration when restoring from checkpoint/savepoint

Timo Walther
Hi,

I gave some answers in the other mail thread. Some additional comment:
In general I think even configuration can be considered as state in this
case. If state is not set, the job can be considered as a fresh start.
Once the state is set, it would basically be just a configuration
update. You could set timers to periodically query either an external
service or just a distributed file system for an updated version of the
configuration.

If you want to avoid excessive state access, you could also cache the
state's content in a transient variable.

Regards,
Timo


On 16.12.20 21:50, vishalovercome wrote:

> Will this work - In main method, serialize config into a string and store it
> using ParameterTool with key as taskName and value as config (serialized as
> string). Then in the open method, lookup the relevant configuration using
> getTaskName().
>
> A follow up to this would be configuring custom windowing functions. I have
> a size as well as a time based window class where size and time limits are
> configurable and passed as constructor arguments. How to change this
> configuration when state persistence/recovery is enabled? A window doesn't
> have an open method per se
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>