Flink 1.3.2
I have 1 vm for the job manager and another for task manager. I have a custom windowing trigger shown below. My checkpoint data is not clearing. I have tried to inject a fileStateThresholdSize when instantiating the FsStateBackend object, but that didn't work. I have tried explicitly setting state.checkpoints.num-retained: 1 in the flink.yaml file but that didn't work either. Not sure what else to try, can someone suggest anything. Thanks in advance. Ryan ====================================================== /** * https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers * */ public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> { private static final Logger LOG = LoggerFactory.getLogger(ThresholdTrigger.class); private static final long serialVersionUID = 1L; private static final SimpleDateFormat sdf = new SimpleDateFormat("dd HH:mm:ss a"); private final ValueStateDescriptor<Integer> maxCountDesc = new ValueStateDescriptor<>( "max", TypeInformation.of(new TypeHint<Integer>() {})); private final ReducingStateDescriptor<Integer> currentCountDesc = new ReducingStateDescriptor<>( "count", new Sum(), IntSerializer.INSTANCE); @Override public TriggerResult onElement(MonitorProbe probe, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; } ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); ReducingState<Integer> currentCount = ctx.getPartitionedState(currentCountDesc); currentCount.add(1); if (maxCount.value() == null) { maxCount.update(probe.getThresholdConfig().getSampleSize()); } LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]", probe.getLoggingKey(), window.getStart(), window.getEnd(), sdf.format(new Date(window.getStart())), sdf.format(new Date(window.getEnd())), currentCount.get(), maxCount.value()); if (currentCount.get().equals(maxCount.value())){ clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; }else{ ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { throw new UnsupportedOperationException("This is not processing time trigger"); } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { ReducingState<Integer> currentCount = ctx.getPartitionedState(currentCountDesc); ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); if (currentCount.get().equals(maxCount.value())){ clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; }else{ clear(window, ctx); return TriggerResult.PURGE; } } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(currentCountDesc).clear(); ctx.getPartitionedState(maxCountDesc).clear(); } @Override public String toString() { return "ThresholdTrigger(" + maxCountDesc + ")"; } private static class Sum implements ReduceFunction<Integer> { private static final long serialVersionUID = 1L; @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I just tried out checkpoint with FsStateBackend in 1.3.2 and everything works as expected for me. Can you give a bit more detail what you mean by „checkpoint data is not cleaning“? For example, is it not cleaned up while the job is running and accumulating „chk-[ID]“ directories or is something left over multiple restarts? Which filesystem are you using for the checkpoints, e.g. local, HDFS, S3,… ? Does this also happen for other jobs? Best, Stefan > Am 08.09.2017 um 03:17 schrieb rnosworthy <[hidden email]>: > > Flink 1.3.2 > > I have 1 vm for the job manager and another for task manager. > > I have a custom windowing trigger shown below. > > My checkpoint data is not clearing. > > I have tried to inject a fileStateThresholdSize when instantiating the > FsStateBackend object, but that didn't work. > > I have tried explicitly setting state.checkpoints.num-retained: 1 in the > flink.yaml file but that didn't work either. > > Not sure what else to try, can someone suggest anything. > > Thanks in advance. > > Ryan > > ====================================================== > > > /** > * > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers > * > */ > public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> { > > private static final Logger LOG = > LoggerFactory.getLogger(ThresholdTrigger.class); > private static final long serialVersionUID = 1L; > private static final SimpleDateFormat sdf = new SimpleDateFormat("dd > HH:mm:ss a"); > > private final ValueStateDescriptor<Integer> maxCountDesc = > new ValueStateDescriptor<>( > "max", > TypeInformation.of(new TypeHint<Integer>() {})); > > private final ReducingStateDescriptor<Integer> currentCountDesc = > new ReducingStateDescriptor<>( > "count", > new Sum(), > IntSerializer.INSTANCE); > > @Override > public TriggerResult onElement(MonitorProbe probe, long timestamp, > TimeWindow window, TriggerContext ctx) > throws Exception { > > if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { > // if the watermark is already past the window fire immediately > clear(window, ctx); > return TriggerResult.FIRE_AND_PURGE; > } > > ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); > ReducingState<Integer> currentCount = > ctx.getPartitionedState(currentCountDesc); > currentCount.add(1); > > if (maxCount.value() == null) { > maxCount.update(probe.getThresholdConfig().getSampleSize()); > } > > LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]", > probe.getLoggingKey(), > window.getStart(), window.getEnd(), > sdf.format(new Date(window.getStart())), > sdf.format(new Date(window.getEnd())), > currentCount.get(), maxCount.value()); > > if (currentCount.get().equals(maxCount.value())){ > clear(window, ctx); > return TriggerResult.FIRE_AND_PURGE; > }else{ > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) > throws Exception { > throw new UnsupportedOperationException("This is not processing time > trigger"); > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > > ReducingState<Integer> currentCount = > ctx.getPartitionedState(currentCountDesc); > ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); > > if (currentCount.get().equals(maxCount.value())){ > clear(window, ctx); > return TriggerResult.FIRE_AND_PURGE; > }else{ > clear(window, ctx); > return TriggerResult.PURGE; > } > } > > @Override > public void clear(TimeWindow window, TriggerContext ctx) throws Exception > { > ctx.getPartitionedState(currentCountDesc).clear(); > ctx.getPartitionedState(maxCountDesc).clear(); > } > > @Override > public String toString() { > return "ThresholdTrigger(" + maxCountDesc + ")"; > } > > private static class Sum implements ReduceFunction<Integer> { > > private static final long serialVersionUID = 1L; > > @Override > public Integer reduce(Integer value1, Integer value2) throws Exception { > return value1 + value2; > } > > } > } > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for the response.
Thats correct, they do not get purged/deleted while the job is running. I have 3 concurrent jobs running and there are 3 directories in the data directory. /var/data/flink/2375c69006bfeca9644171f31b444dff /var/data/flink/c3264bb6d5e068d6440bbb21069b7d28 /var/data/flink/f81d50eb4644cdf65f8f0513713c9d61 in each one of those folders there is chk-1 all the way to chk-2777 for example I do not have a hdfs, I have the flink task manager on a debian vm -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi!
Checkpoints in Flink need to go to a file system that is accessible across machines. Otherwise there could be no recovery of a data of a failed machine. The cleanup is also triggered by a different node than the node that checkpoints - hence you see no cleanup in your setup. Best, Stephan On Fri, Sep 8, 2017 at 6:19 PM, rnosworthy <[hidden email]> wrote: Thanks for the response. |
Can I utilize disk on the job manager for this or do I need a dedicated disk
storage vm? How do I specify not only directory but ip address of the checkpoint data directory? Is there any docs to configure a a state backend without using hdfs or s3? thanks for your help Ryan -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |