Login  Register

Question regarding checkpoint/savepoint and State Processor API

Posted by Eleanore Jin on Jan 21, 2020; 12:07am
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Question-regarding-checkpoint-savepoint-and-State-Processor-API-tp32284.html

Hi there, 

1. in my job, I have a broadcast stream, initially there is no savepoint can be used as bootstrap values for the broadcast stream states. 
  BootstrapTransformation transform = OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");*/
Question: bootstrapWith(dataSet) is required, normally, the dataSet comes from the old savepoint, in this case, I dont have one, how should I deal with it? Or it is must required?

2. As messages coming through broadcast stream, the state gets updated

3. I would like to periodically save the broadcast state to a file via savepoints
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");

4. when the job gets cancelled, and next time when re-start the job, the broadcast initial state can be loaded from the previous savepoint. 
ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state?
Thanks a lot for the help!

Eleanore