Is there a way to use stream API with this program?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Is there a way to use stream API with this program?

Flavio Pompermaier
Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so I tried to replicate this thing with a 

 final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
  final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
 inputStream.map(new SubtaskIndexAssigner(columnType))
        .keyBy(t -> t.f0)
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), 100L))).
        .process(..)

Unfortunately the program exits before reaching the Process function (moreover I need to add another window + trigger after it before adding the reduce function).
Is there a way to do this with the DataStream API or should I still use DataSet API for the moment (when the batch will be fully supported)? I append to the footer all the code required to test the job.

Best,
Flavio

-----------------------------------------------------------------

package org.apache.flink.stats.sketches;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.setParallelism(1);

    final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
    final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
    final DataStream<Row> out = input.map(new SubtaskIndexAssigner(columnType))//
        .keyBy(t -> t.f0)//
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
        .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>() {

          @Override
          public void process(Integer key,
              ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>.Context context,
              Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws Exception {
            for (Tuple2<Integer, Row> tuple : it) {
              out.collect(Row.of(tuple.f1.getField(0).toString()));
            }

          }
        }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
    env.execute();
  }

  private static final class SubtaskIndexAssigner extends RichMapFunction<Row, Tuple2<Integer, Row>>
      implements ResultTypeQueryable<Tuple2<Integer, Row>> {
    private static final long serialVersionUID = 1L;

    private int myTaskId;
    private TypeInformation<?> columnType;

    public SubtaskIndexAssigner(TypeInformation<?> columnType) {
      this.columnType = columnType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override
    public Tuple2<Integer, Row> map(Row row) throws Exception {
      return Tuple2.of(myTaskId, row);
    }

    @Override
    public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
      return new TupleTypeInfo<Tuple2<Integer, Row>>(BasicTypeInfo.INT_TYPE_INFO,
          new RowTypeInfo(columnType));
    }
  }

  private static class CountWithTimeoutTriggerPartition
      extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {

    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final long maxTime;

    private final ReducingStateDescriptor<Long> countstateDesc =
        new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
    private final ReducingStateDescriptor<Long> timestateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
      this.maxCount = maxCount;
      this.maxTime = maxTime;
    }

    public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
      this(maxTime.toMilliseconds(), maxCount);
    }

    @Override
    public TriggerResult onElement(Tuple2<Integer, Row> element, long timestamp,
        GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
        throws Exception {

      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);

      timestamp = ctx.getCurrentProcessingTime();

      if (fireTimestamp.get() == null) {
        long start = timestamp - (timestamp % maxTime);
        long nextFireTimestamp = start + maxTime;

        ctx.registerProcessingTimeTimer(nextFireTimestamp);

        fireTimestamp.add(nextFireTimestamp);
        return TriggerResult.CONTINUE;
      }
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      count.add(1L);
      if (count.get() >= maxCount) {
        count.clear();
        fireTimestamp.clear();
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
        throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      if (fireTimestamp.get().equals(time)) {
        count.clear();
        fireTimestamp.clear();
        fireTimestamp.add(time + maxTime);
        ctx.registerProcessingTimeTimer(time + maxTime);
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
        @SuppressWarnings("unused") GlobalWindow window,
        @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      long timestamp = fireTimestamp.get();
      ctx.deleteProcessingTimeTimer(timestamp);
      fireTimestamp.clear();
      ctx.getPartitionedState(countstateDesc).clear();
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(GlobalWindow window, OnMergeContext ctx) {
      ctx.mergePartitionedState(countstateDesc);
      ctx.mergePartitionedState(timestateDesc);
    }

    class Sum implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
      }
    }

    class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return Math.min(value1, value2);
      }
    }
  }

}
Reply | Threaded
Open this post in threaded view
|

Re: Is there a way to use stream API with this program?

Piotr Nowojski-4
Hi,

I'm afraid that there is not out of the box way of doing this. I've created a ticket [1] to write down and document a discussion that we had about this issue in the past.

The issue is that currently, untriggered processing time timers are ignored on end of input and it seems like there might be no one single perfect way to handle it for all of the cases, but it probably needs to be customized. 

Maybe you could:
1. extend `WindowOperator`  (`MyWindowOperator`)
2. implement `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your `MyWindowOperator`
3. Inside `MyWindowOperator#endInput`  invoke `internalTimerService.forEachProcessingTimeTimer(...)` and:
  a) manually trigger timers `WindowOperator#onProcessingTime`
  b) delete manually triggered timer

Piotrek


pt., 17 lip 2020 o 10:30 Flavio Pompermaier <[hidden email]> napisał(a):
Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so I tried to replicate this thing with a 

 final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
  final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
 inputStream.map(new SubtaskIndexAssigner(columnType))
        .keyBy(t -> t.f0)
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), 100L))).
        .process(..)

Unfortunately the program exits before reaching the Process function (moreover I need to add another window + trigger after it before adding the reduce function).
Is there a way to do this with the DataStream API or should I still use DataSet API for the moment (when the batch will be fully supported)? I append to the footer all the code required to test the job.

Best,
Flavio

-----------------------------------------------------------------

package org.apache.flink.stats.sketches;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.setParallelism(1);

    final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
    final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
    final DataStream<Row> out = input.map(new SubtaskIndexAssigner(columnType))//
        .keyBy(t -> t.f0)//
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
        .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>() {

          @Override
          public void process(Integer key,
              ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>.Context context,
              Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws Exception {
            for (Tuple2<Integer, Row> tuple : it) {
              out.collect(Row.of(tuple.f1.getField(0).toString()));
            }

          }
        }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
    env.execute();
  }

  private static final class SubtaskIndexAssigner extends RichMapFunction<Row, Tuple2<Integer, Row>>
      implements ResultTypeQueryable<Tuple2<Integer, Row>> {
    private static final long serialVersionUID = 1L;

    private int myTaskId;
    private TypeInformation<?> columnType;

    public SubtaskIndexAssigner(TypeInformation<?> columnType) {
      this.columnType = columnType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override
    public Tuple2<Integer, Row> map(Row row) throws Exception {
      return Tuple2.of(myTaskId, row);
    }

    @Override
    public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
      return new TupleTypeInfo<Tuple2<Integer, Row>>(BasicTypeInfo.INT_TYPE_INFO,
          new RowTypeInfo(columnType));
    }
  }

  private static class CountWithTimeoutTriggerPartition
      extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {

    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final long maxTime;

    private final ReducingStateDescriptor<Long> countstateDesc =
        new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
    private final ReducingStateDescriptor<Long> timestateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
      this.maxCount = maxCount;
      this.maxTime = maxTime;
    }

    public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
      this(maxTime.toMilliseconds(), maxCount);
    }

    @Override
    public TriggerResult onElement(Tuple2<Integer, Row> element, long timestamp,
        GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
        throws Exception {

      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);

      timestamp = ctx.getCurrentProcessingTime();

      if (fireTimestamp.get() == null) {
        long start = timestamp - (timestamp % maxTime);
        long nextFireTimestamp = start + maxTime;

        ctx.registerProcessingTimeTimer(nextFireTimestamp);

        fireTimestamp.add(nextFireTimestamp);
        return TriggerResult.CONTINUE;
      }
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      count.add(1L);
      if (count.get() >= maxCount) {
        count.clear();
        fireTimestamp.clear();
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
        throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      if (fireTimestamp.get().equals(time)) {
        count.clear();
        fireTimestamp.clear();
        fireTimestamp.add(time + maxTime);
        ctx.registerProcessingTimeTimer(time + maxTime);
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
        @SuppressWarnings("unused") GlobalWindow window,
        @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      long timestamp = fireTimestamp.get();
      ctx.deleteProcessingTimeTimer(timestamp);
      fireTimestamp.clear();
      ctx.getPartitionedState(countstateDesc).clear();
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(GlobalWindow window, OnMergeContext ctx) {
      ctx.mergePartitionedState(countstateDesc);
      ctx.mergePartitionedState(timestateDesc);
    }

    class Sum implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
      }
    }

    class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return Math.min(value1, value2);
      }
    }
  }

}
Reply | Threaded
Open this post in threaded view
|

Re: Is there a way to use stream API with this program?

David Anderson-3
In this use case, couldn't the custom trigger register an event time timer for MAX_WATERMARK, which would be triggered when the bounded input reaches its end? 

David

On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I'm afraid that there is not out of the box way of doing this. I've created a ticket [1] to write down and document a discussion that we had about this issue in the past.

The issue is that currently, untriggered processing time timers are ignored on end of input and it seems like there might be no one single perfect way to handle it for all of the cases, but it probably needs to be customized. 

Maybe you could:
1. extend `WindowOperator`  (`MyWindowOperator`)
2. implement `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your `MyWindowOperator`
3. Inside `MyWindowOperator#endInput`  invoke `internalTimerService.forEachProcessingTimeTimer(...)` and:
  a) manually trigger timers `WindowOperator#onProcessingTime`
  b) delete manually triggered timer

Piotrek


pt., 17 lip 2020 o 10:30 Flavio Pompermaier <[hidden email]> napisał(a):
Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so I tried to replicate this thing with a 

 final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
  final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
 inputStream.map(new SubtaskIndexAssigner(columnType))
        .keyBy(t -> t.f0)
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), 100L))).
        .process(..)

Unfortunately the program exits before reaching the Process function (moreover I need to add another window + trigger after it before adding the reduce function).
Is there a way to do this with the DataStream API or should I still use DataSet API for the moment (when the batch will be fully supported)? I append to the footer all the code required to test the job.

Best,
Flavio

-----------------------------------------------------------------

package org.apache.flink.stats.sketches;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.setParallelism(1);

    final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
    final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
    final DataStream<Row> out = input.map(new SubtaskIndexAssigner(columnType))//
        .keyBy(t -> t.f0)//
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
        .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>() {

          @Override
          public void process(Integer key,
              ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>.Context context,
              Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws Exception {
            for (Tuple2<Integer, Row> tuple : it) {
              out.collect(Row.of(tuple.f1.getField(0).toString()));
            }

          }
        }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
    env.execute();
  }

  private static final class SubtaskIndexAssigner extends RichMapFunction<Row, Tuple2<Integer, Row>>
      implements ResultTypeQueryable<Tuple2<Integer, Row>> {
    private static final long serialVersionUID = 1L;

    private int myTaskId;
    private TypeInformation<?> columnType;

    public SubtaskIndexAssigner(TypeInformation<?> columnType) {
      this.columnType = columnType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override
    public Tuple2<Integer, Row> map(Row row) throws Exception {
      return Tuple2.of(myTaskId, row);
    }

    @Override
    public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
      return new TupleTypeInfo<Tuple2<Integer, Row>>(BasicTypeInfo.INT_TYPE_INFO,
          new RowTypeInfo(columnType));
    }
  }

  private static class CountWithTimeoutTriggerPartition
      extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {

    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final long maxTime;

    private final ReducingStateDescriptor<Long> countstateDesc =
        new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
    private final ReducingStateDescriptor<Long> timestateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
      this.maxCount = maxCount;
      this.maxTime = maxTime;
    }

    public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
      this(maxTime.toMilliseconds(), maxCount);
    }

    @Override
    public TriggerResult onElement(Tuple2<Integer, Row> element, long timestamp,
        GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
        throws Exception {

      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);

      timestamp = ctx.getCurrentProcessingTime();

      if (fireTimestamp.get() == null) {
        long start = timestamp - (timestamp % maxTime);
        long nextFireTimestamp = start + maxTime;

        ctx.registerProcessingTimeTimer(nextFireTimestamp);

        fireTimestamp.add(nextFireTimestamp);
        return TriggerResult.CONTINUE;
      }
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      count.add(1L);
      if (count.get() >= maxCount) {
        count.clear();
        fireTimestamp.clear();
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
        throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      if (fireTimestamp.get().equals(time)) {
        count.clear();
        fireTimestamp.clear();
        fireTimestamp.add(time + maxTime);
        ctx.registerProcessingTimeTimer(time + maxTime);
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
        @SuppressWarnings("unused") GlobalWindow window,
        @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      long timestamp = fireTimestamp.get();
      ctx.deleteProcessingTimeTimer(timestamp);
      fireTimestamp.clear();
      ctx.getPartitionedState(countstateDesc).clear();
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(GlobalWindow window, OnMergeContext ctx) {
      ctx.mergePartitionedState(countstateDesc);
      ctx.mergePartitionedState(timestateDesc);
    }

    class Sum implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
      }
    }

    class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return Math.min(value1, value2);
      }
    }
  }

}
Reply | Threaded
Open this post in threaded view
|

Re: Is there a way to use stream API with this program?

Flavio Pompermaier
Yes it could..where should I emit the MAX_WATERMARK and how do I detect that the input reached its end?

On Sat, Jul 25, 2020 at 8:08 PM David Anderson <[hidden email]> wrote:
In this use case, couldn't the custom trigger register an event time timer for MAX_WATERMARK, which would be triggered when the bounded input reaches its end? 

David

On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I'm afraid that there is not out of the box way of doing this. I've created a ticket [1] to write down and document a discussion that we had about this issue in the past.

The issue is that currently, untriggered processing time timers are ignored on end of input and it seems like there might be no one single perfect way to handle it for all of the cases, but it probably needs to be customized. 

Maybe you could:
1. extend `WindowOperator`  (`MyWindowOperator`)
2. implement `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your `MyWindowOperator`
3. Inside `MyWindowOperator#endInput`  invoke `internalTimerService.forEachProcessingTimeTimer(...)` and:
  a) manually trigger timers `WindowOperator#onProcessingTime`
  b) delete manually triggered timer

Piotrek


pt., 17 lip 2020 o 10:30 Flavio Pompermaier <[hidden email]> napisał(a):
Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so I tried to replicate this thing with a 

 final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
  final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
 inputStream.map(new SubtaskIndexAssigner(columnType))
        .keyBy(t -> t.f0)
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), 100L))).
        .process(..)

Unfortunately the program exits before reaching the Process function (moreover I need to add another window + trigger after it before adding the reduce function).
Is there a way to do this with the DataStream API or should I still use DataSet API for the moment (when the batch will be fully supported)? I append to the footer all the code required to test the job.

Best,
Flavio

-----------------------------------------------------------------

package org.apache.flink.stats.sketches;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.setParallelism(1);

    final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
    final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
    final DataStream<Row> out = input.map(new SubtaskIndexAssigner(columnType))//
        .keyBy(t -> t.f0)//
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
        .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>() {

          @Override
          public void process(Integer key,
              ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>.Context context,
              Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws Exception {
            for (Tuple2<Integer, Row> tuple : it) {
              out.collect(Row.of(tuple.f1.getField(0).toString()));
            }

          }
        }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
    env.execute();
  }

  private static final class SubtaskIndexAssigner extends RichMapFunction<Row, Tuple2<Integer, Row>>
      implements ResultTypeQueryable<Tuple2<Integer, Row>> {
    private static final long serialVersionUID = 1L;

    private int myTaskId;
    private TypeInformation<?> columnType;

    public SubtaskIndexAssigner(TypeInformation<?> columnType) {
      this.columnType = columnType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override
    public Tuple2<Integer, Row> map(Row row) throws Exception {
      return Tuple2.of(myTaskId, row);
    }

    @Override
    public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
      return new TupleTypeInfo<Tuple2<Integer, Row>>(BasicTypeInfo.INT_TYPE_INFO,
          new RowTypeInfo(columnType));
    }
  }

  private static class CountWithTimeoutTriggerPartition
      extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {

    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final long maxTime;

    private final ReducingStateDescriptor<Long> countstateDesc =
        new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
    private final ReducingStateDescriptor<Long> timestateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
      this.maxCount = maxCount;
      this.maxTime = maxTime;
    }

    public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
      this(maxTime.toMilliseconds(), maxCount);
    }

    @Override
    public TriggerResult onElement(Tuple2<Integer, Row> element, long timestamp,
        GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
        throws Exception {

      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);

      timestamp = ctx.getCurrentProcessingTime();

      if (fireTimestamp.get() == null) {
        long start = timestamp - (timestamp % maxTime);
        long nextFireTimestamp = start + maxTime;

        ctx.registerProcessingTimeTimer(nextFireTimestamp);

        fireTimestamp.add(nextFireTimestamp);
        return TriggerResult.CONTINUE;
      }
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      count.add(1L);
      if (count.get() >= maxCount) {
        count.clear();
        fireTimestamp.clear();
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
        throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      if (fireTimestamp.get().equals(time)) {
        count.clear();
        fireTimestamp.clear();
        fireTimestamp.add(time + maxTime);
        ctx.registerProcessingTimeTimer(time + maxTime);
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
        @SuppressWarnings("unused") GlobalWindow window,
        @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      long timestamp = fireTimestamp.get();
      ctx.deleteProcessingTimeTimer(timestamp);
      fireTimestamp.clear();
      ctx.getPartitionedState(countstateDesc).clear();
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(GlobalWindow window, OnMergeContext ctx) {
      ctx.mergePartitionedState(countstateDesc);
      ctx.mergePartitionedState(timestateDesc);
    }

    class Sum implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
      }
    }

    class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return Math.min(value1, value2);
      }
    }
  }

}

Reply | Threaded
Open this post in threaded view
|

Re: Is there a way to use stream API with this program?

Piotr Nowojski-4
MAX_WATERMARK should be emitted automatically by the WatermarkAssignerOperator.

Piotrek

pon., 27 lip 2020 o 09:16 Flavio Pompermaier <[hidden email]> napisał(a):
Yes it could..where should I emit the MAX_WATERMARK and how do I detect that the input reached its end?

On Sat, Jul 25, 2020 at 8:08 PM David Anderson <[hidden email]> wrote:
In this use case, couldn't the custom trigger register an event time timer for MAX_WATERMARK, which would be triggered when the bounded input reaches its end? 

David

On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I'm afraid that there is not out of the box way of doing this. I've created a ticket [1] to write down and document a discussion that we had about this issue in the past.

The issue is that currently, untriggered processing time timers are ignored on end of input and it seems like there might be no one single perfect way to handle it for all of the cases, but it probably needs to be customized. 

Maybe you could:
1. extend `WindowOperator`  (`MyWindowOperator`)
2. implement `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your `MyWindowOperator`
3. Inside `MyWindowOperator#endInput`  invoke `internalTimerService.forEachProcessingTimeTimer(...)` and:
  a) manually trigger timers `WindowOperator#onProcessingTime`
  b) delete manually triggered timer

Piotrek


pt., 17 lip 2020 o 10:30 Flavio Pompermaier <[hidden email]> napisał(a):
Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so I tried to replicate this thing with a 

 final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
  final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
 inputStream.map(new SubtaskIndexAssigner(columnType))
        .keyBy(t -> t.f0)
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), 100L))).
        .process(..)

Unfortunately the program exits before reaching the Process function (moreover I need to add another window + trigger after it before adding the reduce function).
Is there a way to do this with the DataStream API or should I still use DataSet API for the moment (when the batch will be fully supported)? I append to the footer all the code required to test the job.

Best,
Flavio

-----------------------------------------------------------------

package org.apache.flink.stats.sketches;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.setParallelism(1);

    final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
    final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
    final DataStream<Row> out = input.map(new SubtaskIndexAssigner(columnType))//
        .keyBy(t -> t.f0)//
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
        .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>() {

          @Override
          public void process(Integer key,
              ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>.Context context,
              Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws Exception {
            for (Tuple2<Integer, Row> tuple : it) {
              out.collect(Row.of(tuple.f1.getField(0).toString()));
            }

          }
        }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
    env.execute();
  }

  private static final class SubtaskIndexAssigner extends RichMapFunction<Row, Tuple2<Integer, Row>>
      implements ResultTypeQueryable<Tuple2<Integer, Row>> {
    private static final long serialVersionUID = 1L;

    private int myTaskId;
    private TypeInformation<?> columnType;

    public SubtaskIndexAssigner(TypeInformation<?> columnType) {
      this.columnType = columnType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override
    public Tuple2<Integer, Row> map(Row row) throws Exception {
      return Tuple2.of(myTaskId, row);
    }

    @Override
    public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
      return new TupleTypeInfo<Tuple2<Integer, Row>>(BasicTypeInfo.INT_TYPE_INFO,
          new RowTypeInfo(columnType));
    }
  }

  private static class CountWithTimeoutTriggerPartition
      extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {

    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final long maxTime;

    private final ReducingStateDescriptor<Long> countstateDesc =
        new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
    private final ReducingStateDescriptor<Long> timestateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
      this.maxCount = maxCount;
      this.maxTime = maxTime;
    }

    public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
      this(maxTime.toMilliseconds(), maxCount);
    }

    @Override
    public TriggerResult onElement(Tuple2<Integer, Row> element, long timestamp,
        GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
        throws Exception {

      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);

      timestamp = ctx.getCurrentProcessingTime();

      if (fireTimestamp.get() == null) {
        long start = timestamp - (timestamp % maxTime);
        long nextFireTimestamp = start + maxTime;

        ctx.registerProcessingTimeTimer(nextFireTimestamp);

        fireTimestamp.add(nextFireTimestamp);
        return TriggerResult.CONTINUE;
      }
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      count.add(1L);
      if (count.get() >= maxCount) {
        count.clear();
        fireTimestamp.clear();
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
        throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      if (fireTimestamp.get().equals(time)) {
        count.clear();
        fireTimestamp.clear();
        fireTimestamp.add(time + maxTime);
        ctx.registerProcessingTimeTimer(time + maxTime);
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
        @SuppressWarnings("unused") GlobalWindow window,
        @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      long timestamp = fireTimestamp.get();
      ctx.deleteProcessingTimeTimer(timestamp);
      fireTimestamp.clear();
      ctx.getPartitionedState(countstateDesc).clear();
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(GlobalWindow window, OnMergeContext ctx) {
      ctx.mergePartitionedState(countstateDesc);
      ctx.mergePartitionedState(timestateDesc);
    }

    class Sum implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
      }
    }

    class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return Math.min(value1, value2);
      }
    }
  }

}

Reply | Threaded
Open this post in threaded view
|

Re: Is there a way to use stream API with this program?

David Anderson-3
MAX_WATERMARK is emitted by ContinuousFileReaderOperator and StreamSource when they close.

I think you'll find this just works without your having to do anything to make it happen.

David

On Tue, Jul 28, 2020 at 8:07 AM Piotr Nowojski <[hidden email]> wrote:
MAX_WATERMARK should be emitted automatically by the WatermarkAssignerOperator.

Piotrek

pon., 27 lip 2020 o 09:16 Flavio Pompermaier <[hidden email]> napisał(a):
Yes it could..where should I emit the MAX_WATERMARK and how do I detect that the input reached its end?

On Sat, Jul 25, 2020 at 8:08 PM David Anderson <[hidden email]> wrote:
In this use case, couldn't the custom trigger register an event time timer for MAX_WATERMARK, which would be triggered when the bounded input reaches its end? 

David

On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I'm afraid that there is not out of the box way of doing this. I've created a ticket [1] to write down and document a discussion that we had about this issue in the past.

The issue is that currently, untriggered processing time timers are ignored on end of input and it seems like there might be no one single perfect way to handle it for all of the cases, but it probably needs to be customized. 

Maybe you could:
1. extend `WindowOperator`  (`MyWindowOperator`)
2. implement `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your `MyWindowOperator`
3. Inside `MyWindowOperator#endInput`  invoke `internalTimerService.forEachProcessingTimeTimer(...)` and:
  a) manually trigger timers `WindowOperator#onProcessingTime`
  b) delete manually triggered timer

Piotrek


pt., 17 lip 2020 o 10:30 Flavio Pompermaier <[hidden email]> napisał(a):
Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so I tried to replicate this thing with a 

 final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
  final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
 inputStream.map(new SubtaskIndexAssigner(columnType))
        .keyBy(t -> t.f0)
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), 100L))).
        .process(..)

Unfortunately the program exits before reaching the Process function (moreover I need to add another window + trigger after it before adding the reduce function).
Is there a way to do this with the DataStream API or should I still use DataSet API for the moment (when the batch will be fully supported)? I append to the footer all the code required to test the job.

Best,
Flavio

-----------------------------------------------------------------

package org.apache.flink.stats.sketches;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.setParallelism(1);

    final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
    final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
    final DataStream<Row> out = input.map(new SubtaskIndexAssigner(columnType))//
        .keyBy(t -> t.f0)//
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
        .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>() {

          @Override
          public void process(Integer key,
              ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>.Context context,
              Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws Exception {
            for (Tuple2<Integer, Row> tuple : it) {
              out.collect(Row.of(tuple.f1.getField(0).toString()));
            }

          }
        }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
    env.execute();
  }

  private static final class SubtaskIndexAssigner extends RichMapFunction<Row, Tuple2<Integer, Row>>
      implements ResultTypeQueryable<Tuple2<Integer, Row>> {
    private static final long serialVersionUID = 1L;

    private int myTaskId;
    private TypeInformation<?> columnType;

    public SubtaskIndexAssigner(TypeInformation<?> columnType) {
      this.columnType = columnType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override
    public Tuple2<Integer, Row> map(Row row) throws Exception {
      return Tuple2.of(myTaskId, row);
    }

    @Override
    public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
      return new TupleTypeInfo<Tuple2<Integer, Row>>(BasicTypeInfo.INT_TYPE_INFO,
          new RowTypeInfo(columnType));
    }
  }

  private static class CountWithTimeoutTriggerPartition
      extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {

    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final long maxTime;

    private final ReducingStateDescriptor<Long> countstateDesc =
        new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
    private final ReducingStateDescriptor<Long> timestateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

    public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
      this.maxCount = maxCount;
      this.maxTime = maxTime;
    }

    public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
      this(maxTime.toMilliseconds(), maxCount);
    }

    @Override
    public TriggerResult onElement(Tuple2<Integer, Row> element, long timestamp,
        GlobalWindow window,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
        throws Exception {

      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);

      timestamp = ctx.getCurrentProcessingTime();

      if (fireTimestamp.get() == null) {
        long start = timestamp - (timestamp % maxTime);
        long nextFireTimestamp = start + maxTime;

        ctx.registerProcessingTimeTimer(nextFireTimestamp);

        fireTimestamp.add(nextFireTimestamp);
        return TriggerResult.CONTINUE;
      }
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      count.add(1L);
      if (count.get() >= maxCount) {
        count.clear();
        fireTimestamp.clear();
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
        throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      if (fireTimestamp.get().equals(time)) {
        count.clear();
        fireTimestamp.clear();
        fireTimestamp.add(time + maxTime);
        ctx.registerProcessingTimeTimer(time + maxTime);
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
        @SuppressWarnings("unused") GlobalWindow window,
        @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
      ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc);
      long timestamp = fireTimestamp.get();
      ctx.deleteProcessingTimeTimer(timestamp);
      fireTimestamp.clear();
      ctx.getPartitionedState(countstateDesc).clear();
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(GlobalWindow window, OnMergeContext ctx) {
      ctx.mergePartitionedState(countstateDesc);
      ctx.mergePartitionedState(timestateDesc);
    }

    class Sum implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
      }
    }

    class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return Math.min(value1, value2);
      }
    }
  }

}