[QUESTION] How to parallelize with explicit punctuation in Flink?

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

[QUESTION] How to parallelize with explicit punctuation in Flink?

Filip Niksic

Hi all,


What would be a natural way to implement a parallel version of the following Flink program?


Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.


public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}


The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.


An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());


stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;


   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output


However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?


(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic

Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Yun Gao

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic

Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Filip Niksic
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip



On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic

Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Chesnay Schepler
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;

   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}
DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip



On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic


Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Filip Niksic
Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip



On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;

   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}
DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip



On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic


Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Filip Niksic
Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know!

Let's start like in the original email, except now we set the time characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows. We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th barrier and all the values that immediately precede it, and we emit a watermark with timestamp n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
private long barrierCount = 0;

@Override
public long extractTimestamp(DataItem item, long previousTimestamp) {
return barrierCount;
}

@Nullable
@Override
public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
if (item instanceof Barrier) {
barrierCount++;
return new Watermark(extractedTimestamp);
}
return null;
}
});

In the test input stream, the first value and barrier get a timestamp 0, and the next two values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated keys. A value's key is based on its position in the stream, so we first wrap the values into a type that contains this information.

class ValueWithId {
private final int val;
private final long id;

public ValueWithId(int val, long id) {
this.val = val;
this.id = id;
}
public int getVal() { return val; }
public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no longer need them. Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
private long count = 0L;

@Override
public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
if (item instanceof Value) {
int val = ((Value) item).getVal();
collector.collect(new ValueWithId(val, count++));
}
}
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond tumbling windows. Then we simply aggregate the partial sums, first for each key separately (and importantly, in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
.timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
.aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
@Override
public Integer createAccumulator() { return 0; }

@Override
public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }

@Override
public Integer getResult(Integer acc) { return acc; }

@Override
public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
})
.timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
.reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant PARALLELISM. Ideally, the program should be flexible about the parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already has a feature implementing some of the above, or that something can be done more elegantly, let me know!

Best regards,

Filip



On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <[hidden email]> wrote:
Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip



On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;

   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}
DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip



On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic


Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Yun Gao
      Hi Filip,

         As a whole, I also think to increase the parallelism of the reduce to more than 1, we should use a parallel window to compute the partial sum and then sum the partial sum with WindowAll. 

        For the assignTimestampAndWatermarks, From my side I think the current usage should be OK and it works the same to the other operators. Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take care of the parallelism. In other words, I think you can use .keyBy(x -> x.getId()) directly.

    Best, 
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 9 (Wed.) 12:21
To:user <[hidden email]>
Cc:Yun Gao <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know!

Let's start like in the original email, except now we set the time characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows. We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th barrier and all the values that immediately precede it, and we emit a watermark with timestamp n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
private long barrierCount = 0;

@Override
public long extractTimestamp(DataItem item, long previousTimestamp) {
return barrierCount;
}

@Nullable
@Override
public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
if (item instanceof Barrier) {
barrierCount++;
return new Watermark(extractedTimestamp);
}
return null;
}
});

In the test input stream, the first value and barrier get a timestamp 0, and the next two values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated keys. A value's key is based on its position in the stream, so we first wrap the values into a type that contains this information.

class ValueWithId {
private final int val;
private final long id;

public ValueWithId(int val, long id) {
this.val = val;
this.id = id;
}
public int getVal() { return val; }
public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no longer need them. Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
private long count = 0L;

@Override
public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
if (item instanceof Value) {
int val = ((Value) item).getVal();
collector.collect(new ValueWithId(val, count++));
}
}
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond tumbling windows. Then we simply aggregate the partial sums, first for each key separately (and importantly, in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
.timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
.aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
@Override
public Integer createAccumulator() { return 0; }

@Override
public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }

@Override
public Integer getResult(Integer acc) { return acc; }

@Override
public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
})
.timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
.reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant PARALLELISM. Ideally, the program should be flexible about the parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already has a feature implementing some of the above, or that something can be done more elegantly, let me know!

Best regards,

Filip


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <[hidden email]> wrote:
Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;
   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}
DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic



Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Filip Niksic
Hi Yun,

Thanks. Apropos the keyBy partitioner, I first tried it directly with .keyBy(x -> x.getId()). It is true that the items get evenly distributed among the available task slots, but since there is a single item per key, the aggregations that should be done in parallel become trivial, and the real summation happens in the reduce operator after windowAll. So at least the way things are currently set up, it seems that "%PARALLELISM" is necessary in order to effectively gain parallelism.

Filip



On Wed, Oct 9, 2019 at 11:10 AM Yun Gao <[hidden email]> wrote:
      Hi Filip,

         As a whole, I also think to increase the parallelism of the reduce to more than 1, we should use a parallel window to compute the partial sum and then sum the partial sum with WindowAll. 

        For the assignTimestampAndWatermarks, From my side I think the current usage should be OK and it works the same to the other operators. Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take care of the parallelism. In other words, I think you can use .keyBy(x -> x.getId()) directly.

    Best, 
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 9 (Wed.) 12:21
To:user <[hidden email]>
Cc:Yun Gao <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know!

Let's start like in the original email, except now we set the time characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows. We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th barrier and all the values that immediately precede it, and we emit a watermark with timestamp n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
private long barrierCount = 0;

@Override
public long extractTimestamp(DataItem item, long previousTimestamp) {
return barrierCount;
}

@Nullable
@Override
public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
if (item instanceof Barrier) {
barrierCount++;
return new Watermark(extractedTimestamp);
}
return null;
}
});

In the test input stream, the first value and barrier get a timestamp 0, and the next two values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated keys. A value's key is based on its position in the stream, so we first wrap the values into a type that contains this information.

class ValueWithId {
private final int val;
private final long id;

public ValueWithId(int val, long id) {
this.val = val;
this.id = id;
}
public int getVal() { return val; }
public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no longer need them. Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
private long count = 0L;

@Override
public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
if (item instanceof Value) {
int val = ((Value) item).getVal();
collector.collect(new ValueWithId(val, count++));
}
}
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond tumbling windows. Then we simply aggregate the partial sums, first for each key separately (and importantly, in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
.timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
.aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
@Override
public Integer createAccumulator() { return 0; }

@Override
public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }

@Override
public Integer getResult(Integer acc) { return acc; }

@Override
public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
})
.timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
.reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant PARALLELISM. Ideally, the program should be flexible about the parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already has a feature implementing some of the above, or that something can be done more elegantly, let me know!

Best regards,

Filip


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <[hidden email]> wrote:
Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;
   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}
DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic



Reply | Threaded
Open this post in threaded view
|

AW: [QUESTION] How to parallelize with explicit punctuation in Flink?

Theo
In reply to this post by Yun Gao
Hi Filip, I don't really understand your problem here. 

Do you have a source with a single sequential stream, where from time to time, there is a barrier element? Or do you have a source like Kafka with multiple partitions? 

If you have case 2 with multiple partitions, what exactly do you mean by "order matters"? Will each partition have its own barrier? Or do you have just one barrier for all partitions? In that case, you will naturally have an ordering problem if your events itself contain no time data. 

If you have a "sequential source" why do you need parallelism? Won't it work out to read that partition data in one task (possibly skipping deserialization as much as possible to only recognize barrier events) and then add a downstream task with higher parallelism doing the full deserialization and other work? 

Best regards
Theo



-------- Ursprüngliche Nachricht --------
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink?
Von: Yun Gao
An: Filip Niksic ,user
Cc: Chesnay Schepler


      Hi Filip,

         As a whole, I also think to increase the parallelism of the reduce to more than 1, we should use a parallel window to compute the partial sum and then sum the partial sum with WindowAll. 

        For the assignTimestampAndWatermarks, From my side I think the current usage should be OK and it works the same to the other operators. Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take care of the parallelism. In other words, I think you can use .keyBy(x -> x.getId()) directly.

    Best, 
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 9 (Wed.) 12:21
To:user <[hidden email]>
Cc:Yun Gao <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know!

Let's start like in the original email, except now we set the time characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows. We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th barrier and all the values that immediately precede it, and we emit a watermark with timestamp n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
private long barrierCount = 0;

@Override
public long extractTimestamp(DataItem item, long previousTimestamp) {
return barrierCount;
}

@Nullable
@Override
public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
if (item instanceof Barrier) {
barrierCount++;
return new Watermark(extractedTimestamp);
}
return null;
}
});

In the test input stream, the first value and barrier get a timestamp 0, and the next two values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated keys. A value's key is based on its position in the stream, so we first wrap the values into a type that contains this information.

class ValueWithId {
private final int val;
private final long id;

public ValueWithId(int val, long id) {
this.val = val;
this.id = id;
}
public int getVal() { return val; }
public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no longer need them. Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
private long count = 0L;

@Override
public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
if (item instanceof Value) {
int val = ((Value) item).getVal();
collector.collect(new ValueWithId(val, count++));
}
}
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond tumbling windows. Then we simply aggregate the partial sums, first for each key separately (and importantly, in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
.timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
.aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
@Override
public Integer createAccumulator() { return 0; }

@Override
public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }

@Override
public Integer getResult(Integer acc) { return acc; }

@Override
public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
})
.timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
.reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant PARALLELISM. Ideally, the program should be flexible about the parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already has a feature implementing some of the above, or that something can be done more elegantly, let me know!

Best regards,

Filip


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <[hidden email]> wrote:
Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;
   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}
DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

      Hi Filip,
             
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ? 

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
           
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,

What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}


public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}


public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic



Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Filip Niksic
Hi Theo,

It is a single sequential stream.

If I read your response correctly, you are arguing that summing a bunch of numbers is not much more computationally intensive than assigning timestamps to those numbers, so if the latter has to be done sequentially anyway, then why should the former be done in parallel? To that I can only say that the example I gave is intentionally simple in order to make the problem conceptually clean. By understanding the conceptually clean version of the problem, we also gain insight into messier realistic versions where the operations we want to parallelize may be much more computationally intensive.

Filip



On Wed, Oct 9, 2019 at 1:28 PM [hidden email] <[hidden email]> wrote:
Hi Filip, I don't really understand your problem here.
Do you have a source with a single sequential stream, where from time to time, there is a barrier element? Or do you have a source like Kafka with multiple partitions?
If you have case 2 with multiple partitions, what exactly do you mean by "order matters"? Will each partition have its own barrier? Or do you have just one barrier for all partitions? In that case, you will naturally have an ordering problem if your events itself contain no time data.
If you have a "sequential source" why do you need parallelism? Won't it work out to read that partition data in one task (possibly skipping deserialization as much as possible to only recognize barrier events) and then add a downstream task with higher parallelism doing the full deserialization and other work?
Best regardsTheo
-------- Ursprüngliche Nachricht --------
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink?
Von: Yun Gao
An: Filip Niksic ,user
Cc: Chesnay Schepler


      Hi Filip,

         As a whole, I also think to increase the parallelism of the reduce to more than 1, we should use a parallel window to compute the partial sum and then sum the partial sum with WindowAll.

        For the assignTimestampAndWatermarks, From my side I think the current usage should be OK and it works the same to the other operators. Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take care of the parallelism. In other words, I think you can use .keyBy(x -> x.getId()) directly.

    Best,
    Yun


------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 9 (Wed.) 12:21
To:user <[hidden email]>
Cc:Yun Gao <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know!

Let's start like in the original email, except now we set the time characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
        new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows. We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th barrier and all the values that immediately precede it, and we emit a watermark with timestamp n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
        stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
    private long barrierCount = 0;

    @Override
    public long extractTimestamp(DataItem item, long previousTimestamp) {
        return barrierCount;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
        if (item instanceof Barrier) {
            barrierCount++;
            return new Watermark(extractedTimestamp);
        }
        return null;
    }
});

In the test input stream, the first value and barrier get a timestamp 0, and the next two values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated keys. A value's key is based on its position in the stream, so we first wrap the values into a type that contains this information.

class ValueWithId {
    private final int val;
    private final long id;

    public ValueWithId(int val, long id) {
        this.val = val;
        this.id = id;
    }
    public int getVal() { return val; }
    public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no longer need them. Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
        timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
    private long count = 0L;

    @Override
    public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
        if (item instanceof Value) {
            int val = ((Value) item).getVal();
            collector.collect(new ValueWithId(val, count++));
        }
    }
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond tumbling windows. Then we simply aggregate the partial sums, first for each key separately (and importantly, in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
        .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
        .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
            @Override
            public Integer createAccumulator() { return 0; }

            @Override
            public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }

            @Override
            public Integer getResult(Integer acc) { return acc; }

            @Override
            public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
        })
        .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
        .reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
        .windowAll(GlobalWindows.create())
        .trigger(CountTrigger.of(1))
        .reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant PARALLELISM. Ideally, the program should be flexible about the parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already has a feature implementing some of the above, or that something can be done more elegantly, let me know!

Best regards,

Filip


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <[hidden email]> wrote:

Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;
   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}

DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

       Hi Filip,
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ?

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,
What would be a natural way to implement a parallel version of the following Flink program?
Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.
public interface DataItem {}
public class Value implements DataItem {
 private final int val;
 public Value(int val) { this.val = val; }
 public int getVal() { return val; }
}
public class Barrier implements DataItem {}
The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.
An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<DataItem> stream = env.fromElements(DataItem.class,
 new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());
stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
 private int sum = 0;
 @Override
 public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {
 if (dataItem instanceof Value) {
 sum += ((Value) dataItem).getVal();
       } else {
           collector.collect(sum);
       }
   }
}).setParallelism(1).print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output
However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?
(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)

Best regards,

Filip Niksic



Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Theo
Hi Filip,

My point was not about the computation of the "maximum". My point was: You could hopefully read the stream sequentially and just assign punctuated watermarks to it. Once you have assigned the watermarks properly (And before you do your expensive computatation, like in this case parsing the entire event and building the sum), you could tell flink to repartition / key the data and shuffle it to the worker tasks in the network, so that the downstream operations are performed in parallel. Flink will afaik then take care of dealing with the watmark internally and everything is fine.
I think it is a rare usecase that you have a sequential stream which can not be simply read sequentally. If its such a large stream, that you can't do on a single host: "Read, extract special event, shuffle to the network to other tasks", you probably have a larger issue and need to rethink on the source level already, e.g. change the method serialization to something which has a really lightweight parsing for finding the special events or such.

Best regards
Theo


Von: "Filip Niksic" <[hidden email]>
An: "Theo Diefenthal" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Donnerstag, 10. Oktober 2019 00:08:38
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Hi Theo,

It is a single sequential stream.

If I read your response correctly, you are arguing that summing a bunch of numbers is not much more computationally intensive than assigning timestamps to those numbers, so if the latter has to be done sequentially anyway, then why should the former be done in parallel? To that I can only say that the example I gave is intentionally simple in order to make the problem conceptually clean. By understanding the conceptually clean version of the problem, we also gain insight into messier realistic versions where the operations we want to parallelize may be much more computationally intensive.

Filip



On Wed, Oct 9, 2019 at 1:28 PM [hidden email] <[hidden email]> wrote:
Hi Filip, I don't really understand your problem here.
Do you have a source with a single sequential stream, where from time to time, there is a barrier element? Or do you have a source like Kafka with multiple partitions?
If you have case 2 with multiple partitions, what exactly do you mean by "order matters"? Will each partition have its own barrier? Or do you have just one barrier for all partitions? In that case, you will naturally have an ordering problem if your events itself contain no time data.
If you have a "sequential source" why do you need parallelism? Won't it work out to read that partition data in one task (possibly skipping deserialization as much as possible to only recognize barrier events) and then add a downstream task with higher parallelism doing the full deserialization and other work?
Best regardsTheo
-------- Ursprüngliche Nachricht --------
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink?
Von: Yun Gao
An: Filip Niksic ,user
Cc: Chesnay Schepler


      Hi Filip,

         As a whole, I also think to increase the parallelism of the reduce to more than 1, we should use a parallel window to compute the partial sum and then sum the partial sum with WindowAll.

        For the assignTimestampAndWatermarks, From my side I think the current usage should be OK and it works the same to the other operators. Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take care of the parallelism. In other words, I think you can use .keyBy(x -> x.getId()) directly.

    Best,
    Yun


------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 9 (Wed.) 12:21
To:user <[hidden email]>
Cc:Yun Gao <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know!

Let's start like in the original email, except now we set the time characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
        new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows. We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th barrier and all the values that immediately precede it, and we emit a watermark with timestamp n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
        stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
    private long barrierCount = 0;

    @Override
    public long extractTimestamp(DataItem item, long previousTimestamp) {
        return barrierCount;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
        if (item instanceof Barrier) {
            barrierCount++;
            return new Watermark(extractedTimestamp);
        }
        return null;
    }
});

In the test input stream, the first value and barrier get a timestamp 0, and the next two values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated keys. A value's key is based on its position in the stream, so we first wrap the values into a type that contains this information.

class ValueWithId {
    private final int val;
    private final long id;

    public ValueWithId(int val, long id) {
        this.val = val;
        this.id = id;
    }
    public int getVal() { return val; }
    public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no longer need them. Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
        timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
    private long count = 0L;

    @Override
    public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
        if (item instanceof Value) {
            int val = ((Value) item).getVal();
            collector.collect(new ValueWithId(val, count++));
        }
    }
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond tumbling windows. Then we simply aggregate the partial sums, first for each key separately (and importantly, in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
        .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
        .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
            @Override
            public Integer createAccumulator() { return 0; }

            @Override
            public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }

            @Override
            public Integer getResult(Integer acc) { return acc; }

            @Override
            public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
        })
        .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
        .reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
        .windowAll(GlobalWindows.create())
        .trigger(CountTrigger.of(1))
        .reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant PARALLELISM. Ideally, the program should be flexible about the parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already has a feature implementing some of the above, or that something can be done more elegantly, let me know!

Best regards,

Filip


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <[hidden email]> wrote:

Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;
   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}

DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

       Hi Filip,
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ?

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,
What would be a natural way to implement a parallel version of the following Flink program?
Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.
public interface DataItem {}
public class Value implements DataItem {
 private final int val;
 public Value(int val) { this.val = val; }
 public int getVal() { return val; }
}
public class Barrier implements DataItem {}
The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.
An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<DataItem> stream = env.fromElements(DataItem.class,
 new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());
stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
 private int sum = 0;
 @Override
 public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {
 if (dataItem instanceof Value) {
 sum += ((Value) dataItem).getVal();
       } else {
           collector.collect(sum);
       }
   }
}).setParallelism(1).print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output
However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?
(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)

Best regards,

Filip Niksic



Reply | Threaded
Open this post in threaded view
|

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Filip Niksic
Hi Theo,

Isn't the solution I proposed exactly the solution you talk about? Read the stream sequentially, assign punctuated watermarks, keyBy to achieve parallelism.

Perhaps you're reading too much into my question. When I sent the first email, I didn't even know about punctuated watermarks. Dealing with a sequential stream that cannot be read sequentially was way beyond what I had in mind. :)

Filip



On Thu, Oct 10, 2019 at 7:55 AM Theo Diefenthal <[hidden email]> wrote:
Hi Filip,

My point was not about the computation of the "maximum". My point was: You could hopefully read the stream sequentially and just assign punctuated watermarks to it. Once you have assigned the watermarks properly (And before you do your expensive computatation, like in this case parsing the entire event and building the sum), you could tell flink to repartition / key the data and shuffle it to the worker tasks in the network, so that the downstream operations are performed in parallel. Flink will afaik then take care of dealing with the watmark internally and everything is fine.
I think it is a rare usecase that you have a sequential stream which can not be simply read sequentally. If its such a large stream, that you can't do on a single host: "Read, extract special event, shuffle to the network to other tasks", you probably have a larger issue and need to rethink on the source level already, e.g. change the method serialization to something which has a really lightweight parsing for finding the special events or such.

Best regards
Theo


Von: "Filip Niksic" <[hidden email]>
An: "Theo Diefenthal" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Donnerstag, 10. Oktober 2019 00:08:38
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Hi Theo,

It is a single sequential stream.

If I read your response correctly, you are arguing that summing a bunch of numbers is not much more computationally intensive than assigning timestamps to those numbers, so if the latter has to be done sequentially anyway, then why should the former be done in parallel? To that I can only say that the example I gave is intentionally simple in order to make the problem conceptually clean. By understanding the conceptually clean version of the problem, we also gain insight into messier realistic versions where the operations we want to parallelize may be much more computationally intensive.

Filip



On Wed, Oct 9, 2019 at 1:28 PM [hidden email] <[hidden email]> wrote:
Hi Filip, I don't really understand your problem here.
Do you have a source with a single sequential stream, where from time to time, there is a barrier element? Or do you have a source like Kafka with multiple partitions?
If you have case 2 with multiple partitions, what exactly do you mean by "order matters"? Will each partition have its own barrier? Or do you have just one barrier for all partitions? In that case, you will naturally have an ordering problem if your events itself contain no time data.
If you have a "sequential source" why do you need parallelism? Won't it work out to read that partition data in one task (possibly skipping deserialization as much as possible to only recognize barrier events) and then add a downstream task with higher parallelism doing the full deserialization and other work?
Best regardsTheo
-------- Ursprüngliche Nachricht --------
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink?
Von: Yun Gao
An: Filip Niksic ,user
Cc: Chesnay Schepler


      Hi Filip,

         As a whole, I also think to increase the parallelism of the reduce to more than 1, we should use a parallel window to compute the partial sum and then sum the partial sum with WindowAll.

        For the assignTimestampAndWatermarks, From my side I think the current usage should be OK and it works the same to the other operators. Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not necessary and Flink will take care of the parallelism. In other words, I think you can use .keyBy(x -> x.getId()) directly.

    Best,
    Yun


------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 9 (Wed.) 12:21
To:user <[hidden email]>
Cc:Yun Gao <[hidden email]>; Chesnay Schepler <[hidden email]>
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know!

Let's start like in the original email, except now we set the time characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
        new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows. We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th barrier and all the values that immediately precede it, and we emit a watermark with timestamp n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
        stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
    private long barrierCount = 0;

    @Override
    public long extractTimestamp(DataItem item, long previousTimestamp) {
        return barrierCount;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
        if (item instanceof Barrier) {
            barrierCount++;
            return new Watermark(extractedTimestamp);
        }
        return null;
    }
});

In the test input stream, the first value and barrier get a timestamp 0, and the next two values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially generated keys. A value's key is based on its position in the stream, so we first wrap the values into a type that contains this information.

class ValueWithId {
    private final int val;
    private final long id;

    public ValueWithId(int val, long id) {
        this.val = val;
        this.id = id;
    }
    public int getVal() { return val; }
    public long getId() { return id; }
}

Here is the mapping. At the same time we can drop the barriers, since we no longer need them. Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
        timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
    private long count = 0L;

    @Override
    public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
        if (item instanceof Value) {
            int val = ((Value) item).getVal();
            collector.collect(new ValueWithId(val, count++));
        }
    }
}).setParallelism(1);

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond tumbling windows. Then we simply aggregate the partial sums, first for each key separately (and importantly, in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
        .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
        .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
            @Override
            public Integer createAccumulator() { return 0; }

            @Override
            public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }

            @Override
            public Integer getResult(Integer acc) { return acc; }

            @Override
            public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
        })
        .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
        .reduce((x, y) -> x + y);

Finally, in the original problem I asked for cumulative sums since the start of the stream, so we perform the last set of transformations to achieve that.

DataStream<Integer> cumulativeSums = partialSums
        .windowAll(GlobalWindows.create())
        .trigger(CountTrigger.of(1))
        .reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the constant PARALLELISM. Ideally, the program should be flexible about the parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already has a feature implementing some of the above, or that something can be done more elegantly, let me know!

Best regards,

Filip


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <[hidden email]> wrote:

Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.

Filip


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:
public static class BarrierPartitioner implements Partitioner<DataItem> {

   private int currentPartition = 0;
   @Override
   public int partition(DataItem key, int numPartitions) {
      if (key instanceof Barrier) {
         int partitionToReturn = currentPartition;
         currentPartition = (currentPartition + 1) % numPartitions;
         return partitionToReturn;
      } else {
         return currentPartition;
      }
   }
}

DataStream<DataItem> stream = ...;
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);

On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:

       Hi Filip,
           I have one question on the problem: what is the expected behavior when the  parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ?

          Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation,  therefore is it possible to implement the same logic with the Flink Window directly?
    Best,
    Yun

------------------------------------------------------------------
From:Filip Niksic <[hidden email]>
Send Time:2019 Oct. 8 (Tue.) 08:56
To:user <[hidden email]>
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?

Hi all,
What would be a natural way to implement a parallel version of the following Flink program?
Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.
public interface DataItem {}
public class Value implements DataItem {
 private final int val;
 public Value(int val) { this.val = val; }
 public int getVal() { return val; }
}
public class Barrier implements DataItem {}
The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.
An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<DataItem> stream = env.fromElements(DataItem.class,
 new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());
stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
 private int sum = 0;
 @Override
 public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {
 if (dataItem instanceof Value) {
 sum += ((Value) dataItem).getVal();
       } else {
           collector.collect(sum);
       }
   }
}).setParallelism(1).print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output
However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?
(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)

Best regards,

Filip Niksic