Global State and Scaling

Posted by Elias Levy on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Global-State-and-Scaling-tp15051.html

I am implementing a control stream.  The stream communicates a global configuration value for the whole job.  It uses DataStream.broadcast() to communicate this to all parallel operator instances.  I would like to save this value in state so that it can be recovered when the job restarts/recovers.  The control stream is not keyed, so the only option is Operator state.

I could implement this using the ListCheckpointed interface, returning Collections.singletonList(configValue) from snapshotState.  It is clear what I'd need to do in restoreState in the case of scale in.  If I include a serial number in the config, and it receives multiple values on restore, it can keep the config value with the largest serial number, indicating the latest config.

Alas, it is not clear what should happen on scale out, as some operator instances will receive empty lists.

It seems the other alternative is to use CheckpointedFunction, along with union redistribution via getUnionListState, and then have each operator instance select from the union list the config with the latest serial number, of which they should be multiple copies.  But this seem like an ugly hack.


In addition, the documentation is unclear on the relationship and effect, if any, of the maximum parallelism Flink job parameter on operator state, where as it is much clearer on this regard as it related to keyed state via key groups.


How are folks handling this use case, i.e. storing and restoring global config values via Flink state?