File System State Backend

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

File System State Backend

rnosworthy
Flink 1.3.2

I have 1 vm for the job manager and another for task manager.

I have a custom windowing trigger shown below.

My checkpoint data is not clearing.

I have tried to inject a fileStateThresholdSize when instantiating the
FsStateBackend object, but that didn't work.

I have tried explicitly setting state.checkpoints.num-retained: 1 in the
flink.yaml file but that didn't work either.

Not sure what else to try, can someone suggest anything.

Thanks in advance.

Ryan

======================================================


/**
 *
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers
 *
 */
public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> {

  private static final Logger LOG =
LoggerFactory.getLogger(ThresholdTrigger.class);
  private static final long serialVersionUID = 1L;
  private static final SimpleDateFormat sdf = new SimpleDateFormat("dd
HH:mm:ss a");

  private final ValueStateDescriptor<Integer> maxCountDesc =
      new ValueStateDescriptor<>(
          "max",
          TypeInformation.of(new TypeHint<Integer>() {}));

  private final ReducingStateDescriptor<Integer> currentCountDesc =
      new ReducingStateDescriptor<>(
          "count",
          new Sum(),
          IntSerializer.INSTANCE);

  @Override
  public TriggerResult onElement(MonitorProbe probe, long timestamp,
TimeWindow window, TriggerContext ctx)
      throws Exception {

    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
      // if the watermark is already past the window fire immediately
      clear(window, ctx);
      return TriggerResult.FIRE_AND_PURGE;
    }

    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);
    ReducingState<Integer> currentCount =
ctx.getPartitionedState(currentCountDesc);
    currentCount.add(1);

    if (maxCount.value() == null) {
      maxCount.update(probe.getThresholdConfig().getSampleSize());
    }

    LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]",
        probe.getLoggingKey(),
        window.getStart(), window.getEnd(),
        sdf.format(new Date(window.getStart())),
        sdf.format(new Date(window.getEnd())),
        currentCount.get(), maxCount.value());

    if (currentCount.get().equals(maxCount.value())){
      clear(window, ctx);
      return TriggerResult.FIRE_AND_PURGE;
    }else{
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
    }

  }

  @Override
  public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx)
      throws Exception {
    throw new UnsupportedOperationException("This is not processing time
trigger");
  }

  @Override
  public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {

    ReducingState<Integer> currentCount =
ctx.getPartitionedState(currentCountDesc);
    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);

    if (currentCount.get().equals(maxCount.value())){
      clear(window, ctx);
      return TriggerResult.FIRE_AND_PURGE;
    }else{
      clear(window, ctx);
      return TriggerResult.PURGE;
    }
  }

  @Override
  public void clear(TimeWindow window, TriggerContext ctx) throws Exception
{
    ctx.getPartitionedState(currentCountDesc).clear();
    ctx.getPartitionedState(maxCountDesc).clear();
  }

  @Override
  public String toString() {
    return "ThresholdTrigger(" + maxCountDesc + ")";
  }

  private static class Sum implements ReduceFunction<Integer> {

    private static final long serialVersionUID = 1L;

    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
      return value1 + value2;
    }

  }
}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: File System State Backend

Stefan Richter
Hi,

I just tried out checkpoint with FsStateBackend in 1.3.2 and everything works as expected for me. Can you give a bit more detail what you mean by „checkpoint data is not cleaning“? For example, is it not cleaned up while the job is running and accumulating „chk-[ID]“ directories or is something left over multiple restarts? Which filesystem are you using for the checkpoints, e.g. local, HDFS, S3,… ? Does this also happen for other jobs?

Best,
Stefan

> Am 08.09.2017 um 03:17 schrieb rnosworthy <[hidden email]>:
>
> Flink 1.3.2
>
> I have 1 vm for the job manager and another for task manager.
>
> I have a custom windowing trigger shown below.
>
> My checkpoint data is not clearing.
>
> I have tried to inject a fileStateThresholdSize when instantiating the
> FsStateBackend object, but that didn't work.
>
> I have tried explicitly setting state.checkpoints.num-retained: 1 in the
> flink.yaml file but that didn't work either.
>
> Not sure what else to try, can someone suggest anything.
>
> Thanks in advance.
>
> Ryan
>
> ======================================================
>
>
> /**
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers
> *
> */
> public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> {
>
>  private static final Logger LOG =
> LoggerFactory.getLogger(ThresholdTrigger.class);
>  private static final long serialVersionUID = 1L;
>  private static final SimpleDateFormat sdf = new SimpleDateFormat("dd
> HH:mm:ss a");
>
>  private final ValueStateDescriptor<Integer> maxCountDesc =
>      new ValueStateDescriptor<>(
>          "max",
>          TypeInformation.of(new TypeHint<Integer>() {}));
>
>  private final ReducingStateDescriptor<Integer> currentCountDesc =
>      new ReducingStateDescriptor<>(
>          "count",
>          new Sum(),
>          IntSerializer.INSTANCE);
>
>  @Override
>  public TriggerResult onElement(MonitorProbe probe, long timestamp,
> TimeWindow window, TriggerContext ctx)
>      throws Exception {
>
>    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
>      // if the watermark is already past the window fire immediately
>      clear(window, ctx);
>      return TriggerResult.FIRE_AND_PURGE;
>    }
>
>    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);
>    ReducingState<Integer> currentCount =
> ctx.getPartitionedState(currentCountDesc);
>    currentCount.add(1);
>
>    if (maxCount.value() == null) {
>      maxCount.update(probe.getThresholdConfig().getSampleSize());
>    }
>
>    LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]",
>        probe.getLoggingKey(),
>        window.getStart(), window.getEnd(),
>        sdf.format(new Date(window.getStart())),
>        sdf.format(new Date(window.getEnd())),
>        currentCount.get(), maxCount.value());
>
>    if (currentCount.get().equals(maxCount.value())){
>      clear(window, ctx);
>      return TriggerResult.FIRE_AND_PURGE;
>    }else{
>      ctx.registerEventTimeTimer(window.maxTimestamp());
>      return TriggerResult.CONTINUE;
>    }
>
>  }
>
>  @Override
>  public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx)
>      throws Exception {
>    throw new UnsupportedOperationException("This is not processing time
> trigger");
>  }
>
>  @Override
>  public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>
>    ReducingState<Integer> currentCount =
> ctx.getPartitionedState(currentCountDesc);
>    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);
>
>    if (currentCount.get().equals(maxCount.value())){
>      clear(window, ctx);
>      return TriggerResult.FIRE_AND_PURGE;
>    }else{
>      clear(window, ctx);
>      return TriggerResult.PURGE;
>    }
>  }
>
>  @Override
>  public void clear(TimeWindow window, TriggerContext ctx) throws Exception
> {
>    ctx.getPartitionedState(currentCountDesc).clear();
>    ctx.getPartitionedState(maxCountDesc).clear();
>  }
>
>  @Override
>  public String toString() {
>    return "ThresholdTrigger(" + maxCountDesc + ")";
>  }
>
>  private static class Sum implements ReduceFunction<Integer> {
>
>    private static final long serialVersionUID = 1L;
>
>    @Override
>    public Integer reduce(Integer value1, Integer value2) throws Exception {
>      return value1 + value2;
>    }
>
>  }
> }
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: File System State Backend

rnosworthy
Thanks for the response.

Thats correct, they do not get purged/deleted while the job is running. I
have 3 concurrent jobs running and there are 3 directories in the data
directory.

/var/data/flink/2375c69006bfeca9644171f31b444dff
/var/data/flink/c3264bb6d5e068d6440bbb21069b7d28
/var/data/flink/f81d50eb4644cdf65f8f0513713c9d61

in each one of those folders there is chk-1 all the way to chk-2777 for
example

I do not have a hdfs, I have the flink task manager on a debian vm



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: File System State Backend

Stephan Ewen
Hi!

Checkpoints in Flink need to go to a file system that is accessible across machines. Otherwise there could be no recovery of a data of a failed machine.

The cleanup is also triggered by a different node than the node that checkpoints - hence you see no cleanup in your setup.

Best,
Stephan

On Fri, Sep 8, 2017 at 6:19 PM, rnosworthy <[hidden email]> wrote:
Thanks for the response.

Thats correct, they do not get purged/deleted while the job is running. I
have 3 concurrent jobs running and there are 3 directories in the data
directory.

/var/data/flink/2375c69006bfeca9644171f31b444dff
/var/data/flink/c3264bb6d5e068d6440bbb21069b7d28
/var/data/flink/f81d50eb4644cdf65f8f0513713c9d61

in each one of those folders there is chk-1 all the way to chk-2777 for
example

I do not have a hdfs, I have the flink task manager on a debian vm

Reply | Threaded
Open this post in threaded view
|

Re: File System State Backend

rnosworthy
Can I utilize disk on the job manager for this or do I need a dedicated disk
storage vm?
How do I specify not only directory but ip address of the checkpoint data
directory?
Is there any docs to configure a a state backend without using hdfs or s3?

thanks for your help

Ryan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/