StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);
DataStream<DataItem> stream = env.fromElements(DataItem.class,
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());
DataStream<DataItem> timedStream =
stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>() {
private long barrierCount = 0;
@Override
public long extractTimestamp(DataItem item, long previousTimestamp) {
return barrierCount;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) {
if (item instanceof Barrier) {
barrierCount++;
return new Watermark(extractedTimestamp);
}
return null;
}
});
class ValueWithId {
private final int val;
private final long id;
public ValueWithId(int val, long id) {
this.val = val;
this.id = id;
}
public int getVal() { return val; }
public long getId() { return id; }
}
DataStream<ValueWithId> wrappedStream =
timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
private long count = 0L;
@Override
public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception {
if (item instanceof Value) {
int val = ((Value) item).getVal();
collector.collect(new ValueWithId(val, count++));
}
}
}).setParallelism(1);
DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)
.timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
.aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
@Override
public Integer createAccumulator() { return 0; }
@Override
public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal(); }
@Override
public Integer getResult(Integer acc) { return acc; }
@Override
public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
})
.timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
.reduce((x, y) -> x + y);
DataStream<Integer> cumulativeSums = partialSums
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output
Hi Chesnay,Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.FilipOn Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <[hidden email]> wrote:In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.
I'm wondering whether you could just apply DataStream#partitionCustom to your source:public static class BarrierPartitioner implements Partitioner<DataItem> { private int currentPartition = 0; @Override public int partition(DataItem key, int numPartitions) { if (key instanceof Barrier) { int partitionToReturn = currentPartition; currentPartition = (currentPartition + 1) % numPartitions; return partitionToReturn; } else { return currentPartition; } } }DataStream<DataItem> stream = ...; DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(), item -> item);
On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,
The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.
To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?
Best regards,
Filip
On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <[hidden email]> wrote:
Hi Filip,
I have one question on the problem: what is the expected behavior when the parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just outputs the partial sum of the received value ?
Another question is that I think in Flink the watermark mechanism has provided the functionality similar to punctuation, therefore is it possible to implement the same logic with the Flink Window directly?Best,Yun
------------------------------------------------------------------From:Filip Niksic <[hidden email]>Send Time:2019 Oct. 8 (Tue.) 08:56To:user <[hidden email]>Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?
Hi all,
What would be a natural way to implement a parallel version of the following Flink program?
Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem: Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit punctuation.
public interface DataItem {}
public class Value implements DataItem {
private final int val;
public Value(int val) { this.val = val; }
public int getVal() { return val; }
}
public class Barrier implements DataItem {}
The program should maintain a sum of values seen since the beginning of the stream. On each Barrier, the program should output the sum seen so far.
An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state and emitting it on each Barrier.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<DataItem> stream = env.fromElements(DataItem.class,
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier());
stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
private int sum = 0;
@Override
public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception {
if (dataItem instanceof Value) {
sum += ((Value) dataItem).getVal();
} else {
collector.collect(sum);
}
}
}).setParallelism(1).print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output
However, such an operator cannot be parallelized, since the order of Values and Barriers matters. That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit parallelism?
(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance of the FlatMapFunction. A proper implementation would take more care in using state. Feel free to comment on that as well.)
Best regards,
Filip Niksic
Free forum by Nabble | Edit this page |