I have following piece of configuration in flink.yaml:
Key Value high-availability zookeeper high-availability.storageDir file:///home/flink/flink-ha-data high-availability.zookeeper.quorum localhost:2181 state.backend rocksdb state.backend.incremental true state.checkpoints.dir file:///home/flink/checkpoints And in my code (Main.class): StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStateBackend(new RocksDBStateBackend("file:///home/flink/checkpoint-data", true)); env.enableCheckpointing(Duration.ofMinutes(5).toMillis()); Also the next class should to save data in store, when event is received: public class StateManager extends KeyedProcessFunction<String, String, String> { private ValueState<String> events; @Override public void processElement(String s, Context context, Collector<String> collector) throws Exception { System.out.println("events: " + events.value()); // Check last value for this key Model model = new Gson().fromJson(s, Model.class); events.update(model.toString()); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("state", Types.STRING); events = getRuntimeContext().getState(stateDescriptor); System.out.println("In open"); } } But when I stop a job and start it again no saving data I see. I check it with printing data to sysout. There is null value after restarting job. But why do I get this behavior? Maybe my settings is not proper? Thanks, Yuri L. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Yuri,
The state that you access with getRuntimeContext().getState(...) is scoped to the key (so for every new key this state will be null). What key do you use? Regards, Roman On Fri, Mar 12, 2021 at 7:22 AM Maminspapin <[hidden email]> wrote: > > I have following piece of configuration in flink.yaml: > > Key Value > high-availability zookeeper > high-availability.storageDir file:///home/flink/flink-ha-data > high-availability.zookeeper.quorum localhost:2181 > state.backend rocksdb > state.backend.incremental true > state.checkpoints.dir file:///home/flink/checkpoints > > And in my code (Main.class): > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setStateBackend(new > RocksDBStateBackend("file:///home/flink/checkpoint-data", true)); > env.enableCheckpointing(Duration.ofMinutes(5).toMillis()); > > Also the next class should to save data in store, when event is received: > > public class StateManager extends KeyedProcessFunction<String, String, > String> { > > private ValueState<String> events; > > > @Override > public void processElement(String s, Context context, Collector<String> > collector) throws Exception { > > System.out.println("events: " + events.value()); // Check last value > for this key > > Model model = new Gson().fromJson(s, Model.class); > events.update(model.toString()); > } > > @Override > public void open(Configuration parameters) throws Exception { > ValueStateDescriptor<String> stateDescriptor = new > ValueStateDescriptor<>("state", Types.STRING); > events = getRuntimeContext().getState(stateDescriptor); > System.out.println("In open"); > } > } > > > But when I stop a job and start it again no saving data I see. I check it > with printing data to sysout. There is null value after restarting job. > > But why do I get this behavior? Maybe my settings is not proper? > > Thanks, > Yuri L. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hey, Roman
I use every time the same key. And I get the correct value in StateManager every time the processElement() method executes. But then I stop the job and submit it again. And first execution processElement() get me null in state store. The key wasn't change. So, I'am in confuse Thanks, Yuri L. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Are you starting the job from savepoint [1] when submitting it again?
If not, it is considered as a new job and will not pick up the old state. [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#starting-a-job-from-a-savepoint Regards, Roman On Fri, Mar 12, 2021 at 10:08 AM Maminspapin <[hidden email]> wrote: > > Hey, Roman > > I use every time the same key. > And I get the correct value in StateManager every time the processElement() > method executes. > > But then I stop the job and submit it again. > And first execution processElement() get me null in state store. The key > wasn't change. > > So, I'am in confuse > > Thanks, > Yuri L. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Roman, thank you for your attention.
It looks like you are absolutely right. Thank you very much for helping. Before submitting a job I do next steps: 1. ./bin/start-cluster.sh 2. ./bin/taskmanager.sh start And in my code there is these line: env.setStateBackend(new RocksDBStateBackend("file:///home/flink/checkpoint-data", true)); So I have a directory 'checkpoint-data' and there I can see chk-x (x=index of checkpointing) folder. I assume it is responsible to store my states as a full snapshot. When I stop the app this chk-x folder is removed. So I cant recover from that point. I added these lines: CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); And now it's works. P.S.: But maybe it's better to use savepoint conceptually (not checkpoint) Thanks again, Yuri L. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |