Hi Yang, (redirecting this to user mailing list as this is not a dev question) I am not sure why the state loading is stuck after enabling the compaction filter but the background cleanup of RocksDB state with TTL will not work without activating the filter. This happens on RocksDB opening in Flink, before any state is created and it starts to load. Which version of Flink do you use? Did you try to enable the filter without starting from the checkpoint, basically from the beginning of the job run? Best, Andrey On Fri, Dec 4, 2020 at 11:27 AM Yang Peng <[hidden email]> wrote: Hi,I have some questions about state TTL to consult with everybody,the |
Hi Yang,
Why your checkpoint is failed, was that checkpoint expired or failed due to error?
Could you paste the jstack result of what are RocksDB doing during checkpoint?
BTW, you could also use async-profiler [1] to view what the CPU operation of your actions, this tool could help to view what's RocksDB doing.
Best
Yun Tang
From: Andrey Zagrebin <[hidden email]>
Sent: Friday, December 4, 2020 17:49 To: user <[hidden email]> Subject: Re: Flink 1.9Version State TTL parameter configuration it does not work Hi Yang,
(redirecting this to user mailing list as this is not a dev question)
I am not sure why the state loading is stuck after enabling the compaction filter
but the background cleanup of RocksDB state with TTL will not work without activating the filter.
This happens on RocksDB opening in Flink, before any state is created and it starts to load.
Which version of Flink do you use?
Did you try to enable the filter without starting from the checkpoint, basically from the beginning of the job run?
Best,
Andrey
On Fri, Dec 4, 2020 at 11:27 AM Yang Peng <[hidden email]> wrote:
Hi,I have some questions about state TTL to consult with everybody,the |
Hi,Yun Tang
Thank for you help, checkpoint was failed due to expired, sorrry ,the jstack log was lost, I will restart the job and save the jstack log,The Flame Graph,we are trying to use it. Thank you very much.
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
In reply to this post by Andrey Zagrebin-5
Hi Andrey,
Thank for you help , my job is running on Flink 1.9 version,I restart the job without from checkpoint,it has been running for 12 hours so far. There has been no failure to execute checkpoint so far,but the time it takes to execute checkpoint varies greatly. Time change range from 1s to 8 minutes. Refer to Figure 1 for the execution time of checkpoint <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1922/checkpoint_time.png> .Through this figure, it is found that the time is mainly consumed in the synchronization phase of the 41 subtask executing checkpoint.The jstack log about subtask 41 is as follows: *"曝光数据滤重2 (41/60)" #113 prio=5 os_prio=0 tid=0x00007ff4c8476000 nid=0xd2f9 runnable [0x00007ff4b9fa0000] java.lang.Thread.State: RUNNABLE at org.rocksdb.Checkpoint.createCheckpoint(Native Method) at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.takeDBNativeCheckpoint(RocksIncrementalSnapshotStrategy.java:249) at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.doSnapshot(RocksIncrementalSnapshotStrategy.java:160) at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.snapshot(RocksDBSnapshotStrategyBase.java:126) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:439) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:411) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) - locked <0x00000003debece00> (a java.lang.Object) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:137) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)* *"Async calls on 曝光数据滤重2 (41/60)" #290 daemon prio=5 os_prio=0 tid=0x00007ff4c8f4e800 nid=0xd4bd waiting on condition [0x00007ff4631c4000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000003debd8dc0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)* The above is the detailed log information of jstack, I suspect that the previous task execution checkpoint failed because the execution of the checkpoint timeout failed as the state became larger and longer. Best Yang Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> Andrey Zagrebin-5 wrote > Hi Yang, > > (redirecting this to user mailing list as this is not a dev question) > > I am not sure why the state loading is stuck after enabling the compaction > filter > but the background cleanup of RocksDB state with TTL will not work without > activating the filter. > This happens on RocksDB opening in Flink, before any state is created and > it starts to load. > > Which version of Flink do you use? > Did you try to enable the filter without starting from the > checkpoint, basically from the beginning of the job run? > > Best, > Andrey > > On Fri, Dec 4, 2020 at 11:27 AM Yang Peng < > yangpengklf007@ > > wrote: > >> Hi,I have some questions about state TTL to consult with everybody,the >> statebackend is rocksdb Below is my code: >> -----------------code begin------------- >> private static final String EV_STATE_FLAG = "EV_EID_FLAG"; >> >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.minutes(60)) >> .updateTtlOnCreateAndWrite() >> .neverReturnExpired() >> .cleanupInRocksdbCompactFilter(1000) >> .build(); >> MapStateDescriptor<String, Integer> eidMapStateDesc = new >> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO); >> eidMapStateDesc.enableTimeToLive(ttlConfig); >> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc); >> >> -----------------code end----------------- >> >> I have set the TTL of the state is 60mins, But after 12 hours, through >> the monitor of rocksdb metric , we found that the sst file of >> CF:EV_EID_FLAG has been increasing, and there is no decreasing trend. >> Later we found some information from the taskmanager log:*WARN >> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL >> compaction >> filter for state < EV_EID_FLAG >: feature is disabled for the state >> backend* >> After I added "*state.backend.rocksdb.ttl.compaction.filter.**enabled: >> true*" this parameter, the warn information disappeared, but ater the >> project completed some checkpoints ,The next checkpoint will always >> fail, I >> checked the jstack command and found that the fail checkpoint was stuck >> in >> acquiring state ,disk io is idle;remove the " >> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the >> parameter,the project will resume the checkpoint. So I’m asking >> everyone >> here. Is my usage method wrong? >> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Yun Tang
Hi Yun Tang,
I restart the job ,it has been running for 12 hours so far. There has been no failure to execute checkpoint so far, but some checkpoint execution takes a long time. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1922/checkpoint_time.png> This is the detailed information for performing checkpoint.Time is mainly spent in the synchronization phase of executing checkpoint.The detailed jstack log of this subtask is as follows: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1922/jstack_log.png> I guess the checkpoint synchronization phase takes too long to execute, is it because I set the .cleanupInRocksdbCompactFilter(1000) parameter value too large. Best Yang -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |