Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

Dongwon Kim-2
Hi,

I've defined some states in an instance of KeyedProcessFunction so that each key has its own state.
As the size of the stats can be quite large, I want them to be stored on local disks via a RocksDB state backend.
To that end, I specified only a single property in my flink-conf.yaml:
state.backend: rocksdb

However, it causes the following exception:
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:45)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
        at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:854)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:797)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:243)
        at org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
        at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2051)
        at java.util.Optional.map(Optional.java:215)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2051)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2034)
        at com.kakaomobility.drivinghabit.stream.BlueGreenComparator.main(BlueGreenComparator.java:120)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
... 11 more 

Okay, let's add state.checkpoints.dir in flink-conf.yaml:
state.backend: rocksdb 
state.checkpoints.dir: hdfs:///path/to/ckpts 

But the periodic checkpoints are essentially disabled as I didn't specify "execution.checkpointing.interval" and it's also shown in the attached screenshot.
image.png

What if I want to use RocksDB as an unreliable per-key state storage? 
Do I still have to specify "state.checkpoints.dir" which does totally nothing here?

Best,

Dongwon
 
Reply | Threaded
Open this post in threaded view
|

回复:Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

耿延杰
Hi,

As far as I know, Flink init RocksDBStateBackend.class Object by 
- your user-code config
RocksDBStateBackend rocksDBStateBackend =
new RocksDBStateBackend(EventProcessorConstant.CHECKPOINT_PATH, true);
env.setStateBackend(rocksDBStateBackend);
- if you don't set in your code, then by config file

When we look deep into RocksDBStateBackend.class[1]
All of it's constructor methods require 'CheckpointPathUri'.

Thus, you must set checkpoint path in config file, even you disable checkpoint.


[1]:https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/contrib/streaming/state/RocksDBStateBackend.html



------------------ 原始邮件 ------------------
发件人: "Dongwon Kim" <[hidden email]>;
发送时间: 2021年1月27日(星期三) 下午3:18
收件人: "user"<[hidden email]>;
主题: Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

Hi,

I've defined some states in an instance of KeyedProcessFunction so that each key has its own state.
As the size of the stats can be quite large, I want them to be stored on local disks via a RocksDB state backend.
To that end, I specified only a single property in my flink-conf.yaml:
state.backend: rocksdb

However, it causes the following exception:
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:45)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
        at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:854)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:797)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:243)
        at org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
        at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2051)
        at java.util.Optional.map(Optional.java:215)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2051)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2034)
        at com.kakaomobility.drivinghabit.stream.BlueGreenComparator.main(BlueGreenComparator.java:120)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
... 11 more 

Okay, let's add state.checkpoints.dir in flink-conf.yaml:
state.backend: rocksdb 
state.checkpoints.dir: hdfs:///path/to/ckpts 

But the periodic checkpoints are essentially disabled as I didn't specify "execution.checkpointing.interval" and it's also shown in the attached screenshot.


What if I want to use RocksDB as an unreliable per-key state storage? 
Do I still have to specify "state.checkpoints.dir" which does totally nothing here?

Best,

Dongwon
 
Reply | Threaded
Open this post in threaded view
|

Re: Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

Dongwon Kim-2
Hi 耿延杰,

Would it be better to require "state.checkpoints.dir" only when "execution.checkpointing.interval" is not none?

Best,

Dongwon



On Wed, Jan 27, 2021 at 5:22 PM 耿延杰 <[hidden email]> wrote:
Hi,

As far as I know, Flink init RocksDBStateBackend.class Object by 
- your user-code config
RocksDBStateBackend rocksDBStateBackend =
new RocksDBStateBackend(EventProcessorConstant.CHECKPOINT_PATH, true);
env.setStateBackend(rocksDBStateBackend);
- if you don't set in your code, then by config file

When we look deep into RocksDBStateBackend.class[1]
All of it's constructor methods require 'CheckpointPathUri'.

Thus, you must set checkpoint path in config file, even you disable checkpoint.





------------------ 原始邮件 ------------------
发件人: "Dongwon Kim" <[hidden email]>;
发送时间: 2021年1月27日(星期三) 下午3:18
收件人: "user"<[hidden email]>;
主题: Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

Hi,

I've defined some states in an instance of KeyedProcessFunction so that each key has its own state.
As the size of the stats can be quite large, I want them to be stored on local disks via a RocksDB state backend.
To that end, I specified only a single property in my flink-conf.yaml:
state.backend: rocksdb

However, it causes the following exception:
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:45)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
        at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:854)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:797)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:243)
        at org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
        at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2051)
        at java.util.Optional.map(Optional.java:215)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2051)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2034)
        at com.kakaomobility.drivinghabit.stream.BlueGreenComparator.main(BlueGreenComparator.java:120)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
... 11 more 

Okay, let's add state.checkpoints.dir in flink-conf.yaml:
state.backend: rocksdb 
state.checkpoints.dir: hdfs:///path/to/ckpts 

But the periodic checkpoints are essentially disabled as I didn't specify "execution.checkpointing.interval" and it's also shown in the attached screenshot.


What if I want to use RocksDB as an unreliable per-key state storage? 
Do I still have to specify "state.checkpoints.dir" which does totally nothing here?

Best,

Dongwon
 
Reply | Threaded
Open this post in threaded view
|

回复: Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

耿延杰

Hi Dongwon,
Involve Congxian Qiu who is expert in Flink.

Hi Congxian Qiu,
Would you like to answer Dongwon's question and 
what do you think about his suggestion?

------------------ 原始邮件 ------------------
发件人: "Dongwon Kim" <[hidden email]>;
发送时间: 2021年1月28日(星期四) 上午8:25
收件人: "耿延杰"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

Hi 耿延杰,

Would it be better to require "state.checkpoints.dir" only when "execution.checkpointing.interval" is not none?

Best,

Dongwon



On Wed, Jan 27, 2021 at 5:22 PM 耿延杰 <[hidden email]> wrote:
Hi,

As far as I know, Flink init RocksDBStateBackend.class Object by 
- your user-code config
RocksDBStateBackend rocksDBStateBackend =
new RocksDBStateBackend(EventProcessorConstant.CHECKPOINT_PATH, true);
env.setStateBackend(rocksDBStateBackend);
- if you don't set in your code, then by config file

When we look deep into RocksDBStateBackend.class[1]
All of it's constructor methods require 'CheckpointPathUri'.

Thus, you must set checkpoint path in config file, even you disable checkpoint.





------------------ 原始邮件 ------------------
发件人: "Dongwon Kim" <[hidden email]>;
发送时间: 2021年1月27日(星期三) 下午3:18
收件人: "user"<[hidden email]>;
主题: Setting "unreliable" RocksDB state backend w/o "execution.checkpointing.interval" and "state.checkpoints.dir" properties

Hi,

I've defined some states in an instance of KeyedProcessFunction so that each key has its own state.
As the size of the stats can be quite large, I want them to be stored on local disks via a RocksDB state backend.
To that end, I specified only a single property in my flink-conf.yaml:
state.backend: rocksdb

However, it causes the following exception:
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:45)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
        at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:854)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:797)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:243)
        at org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
        at org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:165)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2051)
        at java.util.Optional.map(Optional.java:215)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2051)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2034)
        at com.kakaomobility.drivinghabit.stream.BlueGreenComparator.main(BlueGreenComparator.java:120)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
... 11 more 

Okay, let's add state.checkpoints.dir in flink-conf.yaml:
state.backend: rocksdb 
state.checkpoints.dir: hdfs:///path/to/ckpts 

But the periodic checkpoints are essentially disabled as I didn't specify "execution.checkpointing.interval" and it's also shown in the attached screenshot.


What if I want to use RocksDB as an unreliable per-key state storage? 
Do I still have to specify "state.checkpoints.dir" which does totally nothing here?

Best,

Dongwon