Hi there,
1. in my job, I have a broadcast stream, initially there is no savepoint can be used as bootstrap values for the broadcast stream states. BootstrapTransformation transform = OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction); Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM) Question: bootstrapWith(dataSet) is required, normally, the dataSet comes from the old savepoint, in this case, I dont have one, how should I deal with it? Or it is must required? 2. As messages coming through broadcast stream, the state gets updated 3. I would like to periodically save the broadcast state to a file via savepoints Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM) .withOperator(OPERATOR_UID, transform) .write("file:///tmp/new_savepoints"); 4. when the job gets cancelled, and next time when re-start the job, the broadcast initial state can be loaded from the previous savepoint. ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
Thanks a lot for the help! Eleanore |
Hi Seth, Thanks for the prompt response! Regarding my second question, once I have converted the existing savepoint to dataset, how can I convert the dataset into BroadcastState? For example, in my BroadcastProcessFunction: @Override // Todo how to add existing BroadcastState from savepoint beforehand? Thanks a lot! Eleanore On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |