@Override
public void processBroadcastElement(String key, Context context, Collector<JsonNode> collector) throws Exception {
// Todo how to add existing BroadcastState from savepoint beforehand?
BroadcastState<String, String> broadcastState = context.getBroadcastState(keySetStateDescriptor);
broadcastState.put(key, key);
}
Hi Eleanore,Bootstrap data is not required to come from an existing savepoint. It can come from any DataSet which could be backed by a file, database, or any other system. The state processor api is also not a tool you are going to use between every start and stop of your job. It is just to bootstrap the initial state of your application. After that, you will use savepoints to carry over the current state of your applications between runs.On Mon, Jan 20, 2020 at 6:07 PM Jin Yi <[hidden email]> wrote:Hi there,1. in my job, I have a broadcast stream, initially there is no savepoint can be used as bootstrap values for the broadcast stream states.BootstrapTransformation transform = OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");*/Question: bootstrapWith(dataSet) is required, normally, the dataSet comes from the old savepoint, in this case, I dont have one, how should I deal with it? Or it is must required?2. As messages coming through broadcast stream, the state gets updated3. I would like to periodically save the broadcast state to a file via savepointsSavepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");4. when the job gets cancelled, and next time when re-start the job, the broadcast initial state can be loaded from the previous savepoint.ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state?Thanks a lot for the help!Eleanore
--Seth Wiesman | Solutions Architect
+1 314 387 1463
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Free forum by Nabble | Edit this page |