Re: Flink 1.9Version State TTL parameter configuration it does not work

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9Version State TTL parameter configuration it does not work

Andrey Zagrebin-5
Hi Yang,

(redirecting this to user mailing list as this is not a dev question)

I am not sure why the state loading is stuck after enabling the compaction filter
but the background cleanup of RocksDB state with TTL will not work without activating the filter.
This happens on RocksDB opening in Flink, before any state is created and it starts to load.

Which version of Flink do you use?
Did you try to enable the filter without starting from the checkpoint, basically from the beginning of the job run?

Best,
Andrey

On Fri, Dec 4, 2020 at 11:27 AM Yang Peng <[hidden email]> wrote:
Hi,I have some questions about state TTL to consult with everybody,the
statebackend is rocksdb  Below is my code:
-----------------code begin-------------
private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.minutes(60))
        .updateTtlOnCreateAndWrite()
        .neverReturnExpired()
        .cleanupInRocksdbCompactFilter(1000)
        .build();
MapStateDescriptor<String, Integer> eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

-----------------code end-----------------

I  have set the TTL of the state is 60mins, But after 12 hours,  through
the monitor of rocksdb metric , we found that the sst file of
 CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
Later we found some information from the taskmanager log:*WARN
org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL compaction
filter for state < EV_EID_FLAG >: feature is disabled for the state backend*
After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
true*"  this parameter, the warn information disappeared, but  ater the
project completed some checkpoints ,The next   checkpoint will always fail, I
checked the jstack command and found that the fail checkpoint was stuck in
acquiring state ,disk io is idle;remove the  "
*state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
parameter,the project  will resume the checkpoint.  So I’m asking everyone
here. Is my usage method wrong?
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9Version State TTL parameter configuration it does not work

Yun Tang
Hi Yang,


Why your checkpoint is failed, was that checkpoint expired or failed due to error?

Could you paste the jstack result of what are RocksDB doing during checkpoint?

BTW, you could also use async-profiler [1] to view what the CPU operation of your actions, this tool could help to view what's RocksDB doing.


Best
Yun Tang


From: Andrey Zagrebin <[hidden email]>
Sent: Friday, December 4, 2020 17:49
To: user <[hidden email]>
Subject: Re: Flink 1.9Version State TTL parameter configuration it does not work
 
Hi Yang,

(redirecting this to user mailing list as this is not a dev question)

I am not sure why the state loading is stuck after enabling the compaction filter
but the background cleanup of RocksDB state with TTL will not work without activating the filter.
This happens on RocksDB opening in Flink, before any state is created and it starts to load.

Which version of Flink do you use?
Did you try to enable the filter without starting from the checkpoint, basically from the beginning of the job run?

Best,
Andrey

On Fri, Dec 4, 2020 at 11:27 AM Yang Peng <[hidden email]> wrote:
Hi,I have some questions about state TTL to consult with everybody,the
statebackend is rocksdb  Below is my code:
-----------------code begin-------------
private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.minutes(60))
        .updateTtlOnCreateAndWrite()
        .neverReturnExpired()
        .cleanupInRocksdbCompactFilter(1000)
        .build();
MapStateDescriptor<String, Integer> eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

-----------------code end-----------------

I  have set the TTL of the state is 60mins, But after 12 hours,  through
the monitor of rocksdb metric , we found that the sst file of
 CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
Later we found some information from the taskmanager log:*WARN
org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL compaction
filter for state < EV_EID_FLAG >: feature is disabled for the state backend*
After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
true*"  this parameter, the warn information disappeared, but  ater the
project completed some checkpoints ,The next   checkpoint will always fail, I
checked the jstack command and found that the fail checkpoint was stuck in
acquiring state ,disk io is idle;remove the  "
*state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
parameter,the project  will resume the checkpoint.  So I’m asking everyone
here. Is my usage method wrong?
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9Version State TTL parameter configuration it does not work

pengyang
Hi,Yun Tang Thank for you help, checkpoint was failed due to expired, sorrry ,the jstack log was lost, I will restart the job and save the jstack log,The Flame Graph,we are trying to use it. Thank you very much.

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9Version State TTL parameter configuration it does not work

pengyang
In reply to this post by Andrey Zagrebin-5
Hi Andrey,
    Thank for you help , my job is running on Flink 1.9 version,I restart
the job without from checkpoint,it has been running for 12 hours so far.
There has been no failure to execute checkpoint so far,but the time   it
takes to execute checkpoint varies greatly. Time change range from 1s to 8
minutes. Refer to Figure 1 for the execution time of checkpoint
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1922/checkpoint_time.png>
.Through this figure, it is found that the time is mainly consumed in the
synchronization phase of the 41 subtask executing checkpoint.The jstack log
about subtask 41 is as follows:
 *"曝光数据滤重2 (41/60)" #113 prio=5 os_prio=0 tid=0x00007ff4c8476000 nid=0xd2f9
runnable [0x00007ff4b9fa0000]
   java.lang.Thread.State: RUNNABLE
        at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
        at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
        at
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.takeDBNativeCheckpoint(RocksIncrementalSnapshotStrategy.java:249)
        at
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.doSnapshot(RocksIncrementalSnapshotStrategy.java:160)
        at
org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.snapshot(RocksDBSnapshotStrategyBase.java:126)
        at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:439)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:411)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
        - locked <0x00000003debece00> (a java.lang.Object)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
        at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
        at
org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:137)
        at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)*

*"Async calls on 曝光数据滤重2 (41/60)" #290 daemon prio=5 os_prio=0
tid=0x00007ff4c8f4e800 nid=0xd4bd waiting on condition [0x00007ff4631c4000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000003debd8dc0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)*





The above is the detailed log information of jstack,
I suspect that the previous task execution checkpoint failed because the
execution of the checkpoint timeout failed as the state became larger and
longer.

Best
Yang



Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>


Andrey Zagrebin-5 wrote

> Hi Yang,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> I am not sure why the state loading is stuck after enabling the compaction
> filter
> but the background cleanup of RocksDB state with TTL will not work without
> activating the filter.
> This happens on RocksDB opening in Flink, before any state is created and
> it starts to load.
>
> Which version of Flink do you use?
> Did you try to enable the filter without starting from the
> checkpoint, basically from the beginning of the job run?
>
> Best,
> Andrey
>
> On Fri, Dec 4, 2020 at 11:27 AM Yang Peng &lt;

> yangpengklf007@

> &gt; wrote:
>
>> Hi,I have some questions about state TTL to consult with everybody,the
>> statebackend is rocksdb  Below is my code:
>> -----------------code begin-------------
>> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>>
>> StateTtlConfig ttlConfig = StateTtlConfig
>>         .newBuilder(Time.minutes(60))
>>         .updateTtlOnCreateAndWrite()
>>         .neverReturnExpired()
>>         .cleanupInRocksdbCompactFilter(1000)
>>         .build();
>> MapStateDescriptor&lt;String, Integer&gt; eidMapStateDesc = new
>> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
>>         BasicTypeInfo.INT_TYPE_INFO);
>> eidMapStateDesc.enableTimeToLive(ttlConfig);
>> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>>
>> -----------------code end-----------------
>>
>> I  have set the TTL of the state is 60mins, But after 12 hours,  through
>> the monitor of rocksdb metric , we found that the sst file of
>>  CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
>> Later we found some information from the taskmanager log:*WARN
>> org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
>> compaction
>> filter for state < EV_EID_FLAG >: feature is disabled for the state
>> backend*
>> After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
>> true*"  this parameter, the warn information disappeared, but  ater the
>> project completed some checkpoints ,The next   checkpoint will always
>> fail, I
>> checked the jstack command and found that the fail checkpoint was stuck
>> in
>> acquiring state ,disk io is idle;remove the  "
>> *state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
>> parameter,the project  will resume the checkpoint.  So I’m asking
>> everyone
>> here. Is my usage method wrong?
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9Version State TTL parameter configuration it does not work

pengyang
In reply to this post by Yun Tang
Hi Yun Tang,
    I restart the job ,it has been running for 12 hours so far. There has
been no failure to execute checkpoint so far, but some checkpoint execution
takes a long time.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1922/checkpoint_time.png>  
This is the detailed information for performing checkpoint.Time is mainly
spent in the synchronization phase of executing checkpoint.The detailed
jstack log of this subtask is as follows:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1922/jstack_log.png>
I guess the checkpoint synchronization phase takes too long to execute, is
it because I set the .cleanupInRocksdbCompactFilter(1000) parameter value
too large.
Best
Yang






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/