No saving data using rocksdb

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

No saving data using rocksdb

Maminspapin
I have following piece of configuration in flink.yaml:

Key Value
high-availability zookeeper
high-availability.storageDir        file:///home/flink/flink-ha-data
high-availability.zookeeper.quorum localhost:2181
state.backend rocksdb
state.backend.incremental true
state.checkpoints.dir file:///home/flink/checkpoints

And in my code (Main.class):

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new
RocksDBStateBackend("file:///home/flink/checkpoint-data", true));
env.enableCheckpointing(Duration.ofMinutes(5).toMillis());

Also the next class should to save data in store, when event is received:

public class StateManager extends KeyedProcessFunction<String, String,
String> {

    private ValueState<String> events;


    @Override
    public void processElement(String s, Context context, Collector<String>
collector) throws Exception {
       
        System.out.println("events: " + events.value()); // Check last value
for this key

        Model model = new Gson().fromJson(s, Model.class);
        events.update(model.toString());
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<String> stateDescriptor = new
ValueStateDescriptor<>("state", Types.STRING);
        events = getRuntimeContext().getState(stateDescriptor);
        System.out.println("In open");
    }
}


But when I stop a job and start it again no saving data I see. I check it
with printing data to sysout. There is null value after restarting job.

But why do I get this behavior? Maybe my settings is not proper?

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: No saving data using rocksdb

Roman Khachatryan
Hi Yuri,

The state that you access with getRuntimeContext().getState(...) is
scoped to the key (so for every new key this state will be null).
What key do you use?

Regards,
Roman

On Fri, Mar 12, 2021 at 7:22 AM Maminspapin <[hidden email]> wrote:

>
> I have following piece of configuration in flink.yaml:
>
> Key                                                             Value
> high-availability                                               zookeeper
> high-availability.storageDir                            file:///home/flink/flink-ha-data
> high-availability.zookeeper.quorum              localhost:2181
> state.backend                                           rocksdb
> state.backend.incremental                               true
> state.checkpoints.dir                                   file:///home/flink/checkpoints
>
> And in my code (Main.class):
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setStateBackend(new
> RocksDBStateBackend("file:///home/flink/checkpoint-data", true));
> env.enableCheckpointing(Duration.ofMinutes(5).toMillis());
>
> Also the next class should to save data in store, when event is received:
>
> public class StateManager extends KeyedProcessFunction<String, String,
> String> {
>
>     private ValueState<String> events;
>
>
>     @Override
>     public void processElement(String s, Context context, Collector<String>
> collector) throws Exception {
>
>         System.out.println("events: " + events.value()); // Check last value
> for this key
>
>         Model model = new Gson().fromJson(s, Model.class);
>         events.update(model.toString());
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         ValueStateDescriptor<String> stateDescriptor = new
> ValueStateDescriptor<>("state", Types.STRING);
>         events = getRuntimeContext().getState(stateDescriptor);
>         System.out.println("In open");
>     }
> }
>
>
> But when I stop a job and start it again no saving data I see. I check it
> with printing data to sysout. There is null value after restarting job.
>
> But why do I get this behavior? Maybe my settings is not proper?
>
> Thanks,
> Yuri L.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: No saving data using rocksdb

Maminspapin
Hey, Roman

I use every time the same key.
And I get the correct value in StateManager every time the processElement()
method executes.

But then I stop the job and submit it again.
And first execution processElement() get me null in state store. The key
wasn't change.

So, I'am in confuse

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: No saving data using rocksdb

Roman Khachatryan
Are you starting the job from savepoint [1] when submitting it again?
If not, it is considered as a new job and will not pick up the old state.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#starting-a-job-from-a-savepoint

Regards,
Roman


On Fri, Mar 12, 2021 at 10:08 AM Maminspapin <[hidden email]> wrote:

>
> Hey, Roman
>
> I use every time the same key.
> And I get the correct value in StateManager every time the processElement()
> method executes.
>
> But then I stop the job and submit it again.
> And first execution processElement() get me null in state store. The key
> wasn't change.
>
> So, I'am in confuse
>
> Thanks,
> Yuri L.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: No saving data using rocksdb

Maminspapin
Roman, thank you for your attention.

It looks like you are absolutely right. Thank you very much for helping.

Before submitting a job I do next steps:
1. ./bin/start-cluster.sh
2. ./bin/taskmanager.sh start

And in my code there is these line:
env.setStateBackend(new
RocksDBStateBackend("file:///home/flink/checkpoint-data", true));

So I have a directory 'checkpoint-data' and there I can see chk-x (x=index
of checkpointing) folder. I assume it is responsible to store my states as a
full snapshot.

When I stop the app this chk-x folder is removed. So I cant recover from
that point.

I added these lines:
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
       
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

And now it's works.

P.S.: But maybe it's better to use savepoint conceptually  (not checkpoint)

Thanks again,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/