Question regarding checkpoint/savepoint and State Processor API

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Question regarding checkpoint/savepoint and State Processor API

Eleanore Jin
Hi there, 

1. in my job, I have a broadcast stream, initially there is no savepoint can be used as bootstrap values for the broadcast stream states. 
  BootstrapTransformation transform = OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");*/
Question: bootstrapWith(dataSet) is required, normally, the dataSet comes from the old savepoint, in this case, I dont have one, how should I deal with it? Or it is must required?

2. As messages coming through broadcast stream, the state gets updated

3. I would like to periodically save the broadcast state to a file via savepoints
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");

4. when the job gets cancelled, and next time when re-start the job, the broadcast initial state can be loaded from the previous savepoint. 
ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state?
Thanks a lot for the help!

Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: Question regarding checkpoint/savepoint and State Processor API

Eleanore Jin
Hi Seth, 

Thanks for the prompt response! Regarding my second question, once I have converted the existing savepoint to dataset, how can I convert the dataset into BroadcastState?

For example, in my BroadcastProcessFunction: 
@Override
public void processBroadcastElement(String key, Context context, Collector<JsonNode> collector) throws Exception {
// Todo how to add existing BroadcastState from savepoint beforehand? 
BroadcastState<String, String> broadcastState = context.getBroadcastState(keySetStateDescriptor);
broadcastState.put(key, key);
}

Thanks a lot!
Eleanore

On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman <[hidden email]> wrote:
Hi Eleanore,

Bootstrap data is not required to come from an existing savepoint. It can come from any DataSet which could be backed by a file, database, or any other system. The state processor api is also not a tool you are going to use between every start and stop of your job. It is just to bootstrap the initial state of your application. After that, you will use savepoints to carry over the current state of your applications between runs.



On Mon, Jan 20, 2020 at 6:07 PM Jin Yi <[hidden email]> wrote:
Hi there, 

1. in my job, I have a broadcast stream, initially there is no savepoint can be used as bootstrap values for the broadcast stream states. 
  BootstrapTransformation transform = OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");*/
Question: bootstrapWith(dataSet) is required, normally, the dataSet comes from the old savepoint, in this case, I dont have one, how should I deal with it? Or it is must required?

2. As messages coming through broadcast stream, the state gets updated

3. I would like to periodically save the broadcast state to a file via savepoints
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");

4. when the job gets cancelled, and next time when re-start the job, the broadcast initial state can be loaded from the previous savepoint. 
ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state?
Thanks a lot for the help!

Eleanore


--

Seth Wiesman | Solutions Architect

+1 314 387 1463



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen