union followed by timestamp assignment / watermark generation

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

union followed by timestamp assignment / watermark generation

Petr Novotnik
Hello Flinkers,

Given this small example program:

> https://pastebin.com/30JbbgpH

I'd expect the output:

> one|three
> two|four

However, I consistently receive ...

> one
> two|four

... due to "three" being considered a late-comer which then gets
discarded. When I remove `assignTimestampsAndWatermarks` after the
`union` and place it separately on each of the union's inputs, i.e.
before the `union`, I get what I expect.

Now, after digging through Flink's source code, this behavior actually
seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
operators form one task). Though, from a user/api perspective, it is at
least surprising.

I wanted to ask whether kind of behavior is known, indented or maybe
something to be improved to avoid the gotcha?

Many thanks in advance,
Pete.


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: union followed by timestamp assignment / watermark generation

Aljoscha Krettek
Hi Petr,

I just stumbled across this (slightly older) mail. Your example on pastebin is not available anymore but I’m guessing you have roughly these two topologies:

1.

Source1 -> Map1 -> ExtractTimestamps -|
                                                                  | ->  Map3 …
Source2 -> Map2 -> ExtractTimestamps -|

The union is not visible at the graph level, it’s implicit in the combination of the two input streams.

2.

Source1 -> Map1 -|
                              | -> ExtractTimestamps -> Map3 …
Source2 -> Map2 -|

The union is not visible at the graph level, it’s implicit in the combination of the two input streams.

I’m also guessing that you have a timestamp/watermark assigner where the watermark is the highest-seen timestamp minus some lateness bound. I think the behaviour is not necessarily an artefact of the Flink implementation (with maps and extractors being fused together) but results from the graph itself and how watermarks are defined and how the extractor works: in the first case, each stream (before the union) has its own watermark and the watermark at Map3 is the minimum over those watermarks. This explains why a lower watermark on the one stream holds back the watermark in total at Map3. In the second case, the two streams are unioned together before extracting a timestamp/watermark and the choice of timestamp extractor (which takes the highest-seen timestamp) means that the watermark now advances “faster” because there is logically not a slower, separate stream anymore.

Is that analysis correct? Does my description roughly make sense?

Best,
Aljoscha

> On 6. May 2017, at 15:00, Petr Novotnik <[hidden email]> wrote:
>
> Hello Flinkers,
>
> Given this small example program:
>
>> https://pastebin.com/30JbbgpH
>
> I'd expect the output:
>
>> one|three
>> two|four
>
> However, I consistently receive ...
>
>> one
>> two|four
>
> ... due to "three" being considered a late-comer which then gets
> discarded. When I remove `assignTimestampsAndWatermarks` after the
> `union` and place it separately on each of the union's inputs, i.e.
> before the `union`, I get what I expect.
>
> Now, after digging through Flink's source code, this behavior actually
> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
> operators form one task). Though, from a user/api perspective, it is at
> least surprising.
>
> I wanted to ask whether kind of behavior is known, indented or maybe
> something to be improved to avoid the gotcha?
>
> Many thanks in advance,
> Pete.
>

Reply | Threaded
Open this post in threaded view
|

Re: union followed by timestamp assignment / watermark generation

Petr Novotnik
Hello Aljoscha,

Fortunately, I found the program in Google's caches :) I've attached
below for reference. I'm stunned by how accurately you have hit the
point given the few pieces of information I left in the original text. +1

Yes, it's exactly as you explained. Can you think of a scenario where it
would lead to reasonable results if a user placed the
time-extraction/watermark-generation (directly or indirectly) after a
union operation? So far I couldn't and start believing that at least
it'd be nice to warn the user if he tries to do so.

Many thanks for you analysis and the time,
Pete.


> import org.apache.flink.api.common.functions.ReduceFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.windowing.time.Time;
>
> import java.util.Arrays;
> import java.util.List;
> import java.util.concurrent.TimeUnit;
>
> public class TimestampAssignmentTest {
>
>   public static void main(String[] args) throws Exception {
>     runTest();
>   }
>
>   private static void runTest() throws Exception {
>     List<Tuple2<String, Long>> left = Arrays.asList(Tuple2.of("one", 1L), Tuple2.of("two", 3L));
>     List<Tuple2<String, Long>> right = Arrays.asList(Tuple2.of("three", 2L), Tuple2.of("four", 4L));
>
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     env.getConfig().setAutoWatermarkInterval(10);
>     env.setParallelism(1);
>
>     // ~ a very fast source
>     SingleOutputStreamOperator<Tuple2<String, Long>> leftInput =
>         env.addSource(new DelayedSourceFunction<>(left, Time.milliseconds(0)))
>         .returns(new TypeHint<Tuple2<String, Long>>() {});
>     // ~ a very slow source
>     SingleOutputStreamOperator<Tuple2<String, Long>> rightInput =
>         env.addSource(new DelayedSourceFunction<>(right, Time.seconds(10)))
>         .returns(new TypeHint<Tuple2<String, Long>>() {});
>
>     leftInput.union(rightInput)
>         .assignTimestampsAndWatermarks(new EventTimeExtractor<>(Time.milliseconds(0)))
>         .map(t -> t.f0).returns(new TypeHint<String>() {})
>         .timeWindowAll(Time.milliseconds(3))
>         .reduce((ReduceFunction<String>) (s, t) -> s + "|" + t).returns(new TypeHint<String>() {})
>         .print();
>
>     env.execute();
>   }
>
>   static class EventTimeExtractor<I>
>     extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<I, Long>> {
>
>     EventTimeExtractor(Time maxOutOfOrderness) {
>       super(maxOutOfOrderness);
>     }
>
>     @Override
>     public long extractTimestamp(Tuple2<I, Long> element) {
>       return element.f1;
>     }
>   }
>
>   static class DelayedSourceFunction<I> implements SourceFunction<I> {
>     private volatile boolean running;
>     private final List<I> elems;
>     private final long delayMillis;
>
>     DelayedSourceFunction(List<I> elems, Time delay) {
>       this.elems = elems;
>       this.delayMillis = delay.toMilliseconds();
>     }
>
>     @Override
>     public void run(SourceContext<I> ctx) throws Exception {
>       running = true;
>       for (I elem : elems) {
>         if (!running) {
>           break;
>         }
>         delay();
>         ctx.collect(elem);
>       }
>     }
>
>     private void delay() throws InterruptedException {
>       if (delayMillis > 0) {
>         long start = System.nanoTime();
>         while (true) {
>           long curr = System.nanoTime();
>           long waitMillis = TimeUnit.NANOSECONDS.toMillis(curr - start);
>           if (waitMillis < delayMillis) {
>             Thread.sleep(delayMillis - waitMillis);
>           } else {
>             break;
>           }
>         }
>       }
>     }
>
>     @Override
>     public void cancel() {
>       running = false;
>     }
>   }
> }


On 06/14/2017 04:02 PM, Aljoscha Krettek wrote:

> Hi Petr,
>
> I just stumbled across this (slightly older) mail. Your example on pastebin is not available anymore but I’m guessing you have roughly these two topologies:
>
> 1.
>
> Source1 -> Map1 -> ExtractTimestamps -|
>  | ->  Map3 …
> Source2 -> Map2 -> ExtractTimestamps -|
>
> The union is not visible at the graph level, it’s implicit in the combination of the two input streams.
>
> 2.
>
> Source1 -> Map1 -|
>      | -> ExtractTimestamps -> Map3 …
> Source2 -> Map2 -|
>
> The union is not visible at the graph level, it’s implicit in the combination of the two input streams.
>
> I’m also guessing that you have a timestamp/watermark assigner where the watermark is the highest-seen timestamp minus some lateness bound. I think the behaviour is not necessarily an artefact of the Flink implementation (with maps and extractors being fused together) but results from the graph itself and how watermarks are defined and how the extractor works: in the first case, each stream (before the union) has its own watermark and the watermark at Map3 is the minimum over those watermarks. This explains why a lower watermark on the one stream holds back the watermark in total at Map3. In the second case, the two streams are unioned together before extracting a timestamp/watermark and the choice of timestamp extractor (which takes the highest-seen timestamp) means that the watermark now advances “faster” because there is logically not a slower, separate stream anymore.
>
> Is that analysis correct? Does my description roughly make sense?
>
> Best,
> Aljoscha
>
>> On 6. May 2017, at 15:00, Petr Novotnik <[hidden email]> wrote:
>>
>> Hello Flinkers,
>>
>> Given this small example program:
>>
>>> https://pastebin.com/30JbbgpH
>>
>> I'd expect the output:
>>
>>> one|three
>>> two|four
>>
>> However, I consistently receive ...
>>
>>> one
>>> two|four
>>
>> ... due to "three" being considered a late-comer which then gets
>> discarded. When I remove `assignTimestampsAndWatermarks` after the
>> `union` and place it separately on each of the union's inputs, i.e.
>> before the `union`, I get what I expect.
>>
>> Now, after digging through Flink's source code, this behavior actually
>> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
>> operators form one task). Though, from a user/api perspective, it is at
>> least surprising.
>>
>> I wanted to ask whether kind of behavior is known, indented or maybe
>> something to be improved to avoid the gotcha?
>>
>> Many thanks in advance,
>> Pete.
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: union followed by timestamp assignment / watermark generation

Aljoscha Krettek
Hi,

Yes, I can’t think of cases right now where placing the extractor after a union makes sense. In general, I think it’s always best to place the timestamp extractor as close to the sources (or in the sources, for Kafka) as possible. Right now it would be quite hard (and probably a bit hacky) to detect such a situation and give a warning. If you want, please open a Jira issue so that we don’t forget about this and maybe we can solve it in the future when we have a slightly different graph structure that would allow to run such analyses on the user program.

Best,
Aljoscha

> On 15. Jun 2017, at 13:20, Petr Novotnik <[hidden email]> wrote:
>
> Hello Aljoscha,
>
> Fortunately, I found the program in Google's caches :) I've attached
> below for reference. I'm stunned by how accurately you have hit the
> point given the few pieces of information I left in the original text. +1
>
> Yes, it's exactly as you explained. Can you think of a scenario where it
> would lead to reasonable results if a user placed the
> time-extraction/watermark-generation (directly or indirectly) after a
> union operation? So far I couldn't and start believing that at least
> it'd be nice to warn the user if he tries to do so.
>
> Many thanks for you analysis and the time,
> Pete.
>
>
>> import org.apache.flink.api.common.functions.ReduceFunction;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>>
>> import java.util.Arrays;
>> import java.util.List;
>> import java.util.concurrent.TimeUnit;
>>
>> public class TimestampAssignmentTest {
>>
>>  public static void main(String[] args) throws Exception {
>>    runTest();
>>  }
>>
>>  private static void runTest() throws Exception {
>>    List<Tuple2<String, Long>> left = Arrays.asList(Tuple2.of("one", 1L), Tuple2.of("two", 3L));
>>    List<Tuple2<String, Long>> right = Arrays.asList(Tuple2.of("three", 2L), Tuple2.of("four", 4L));
>>
>>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>    env.getConfig().setAutoWatermarkInterval(10);
>>    env.setParallelism(1);
>>
>>    // ~ a very fast source
>>    SingleOutputStreamOperator<Tuple2<String, Long>> leftInput =
>>        env.addSource(new DelayedSourceFunction<>(left, Time.milliseconds(0)))
>>        .returns(new TypeHint<Tuple2<String, Long>>() {});
>>    // ~ a very slow source
>>    SingleOutputStreamOperator<Tuple2<String, Long>> rightInput =
>>        env.addSource(new DelayedSourceFunction<>(right, Time.seconds(10)))
>>        .returns(new TypeHint<Tuple2<String, Long>>() {});
>>
>>    leftInput.union(rightInput)
>>        .assignTimestampsAndWatermarks(new EventTimeExtractor<>(Time.milliseconds(0)))
>>        .map(t -> t.f0).returns(new TypeHint<String>() {})
>>        .timeWindowAll(Time.milliseconds(3))
>>        .reduce((ReduceFunction<String>) (s, t) -> s + "|" + t).returns(new TypeHint<String>() {})
>>        .print();
>>
>>    env.execute();
>>  }
>>
>>  static class EventTimeExtractor<I>
>>    extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<I, Long>> {
>>
>>    EventTimeExtractor(Time maxOutOfOrderness) {
>>      super(maxOutOfOrderness);
>>    }
>>
>>    @Override
>>    public long extractTimestamp(Tuple2<I, Long> element) {
>>      return element.f1;
>>    }
>>  }
>>
>>  static class DelayedSourceFunction<I> implements SourceFunction<I> {
>>    private volatile boolean running;
>>    private final List<I> elems;
>>    private final long delayMillis;
>>
>>    DelayedSourceFunction(List<I> elems, Time delay) {
>>      this.elems = elems;
>>      this.delayMillis = delay.toMilliseconds();
>>    }
>>
>>    @Override
>>    public void run(SourceContext<I> ctx) throws Exception {
>>      running = true;
>>      for (I elem : elems) {
>>        if (!running) {
>>          break;
>>        }
>>        delay();
>>        ctx.collect(elem);
>>      }
>>    }
>>
>>    private void delay() throws InterruptedException {
>>      if (delayMillis > 0) {
>>        long start = System.nanoTime();
>>        while (true) {
>>          long curr = System.nanoTime();
>>          long waitMillis = TimeUnit.NANOSECONDS.toMillis(curr - start);
>>          if (waitMillis < delayMillis) {
>>            Thread.sleep(delayMillis - waitMillis);
>>          } else {
>>            break;
>>          }
>>        }
>>      }
>>    }
>>
>>    @Override
>>    public void cancel() {
>>      running = false;
>>    }
>>  }
>> }
>
>
>
> On 06/14/2017 04:02 PM, Aljoscha Krettek wrote:
>> Hi Petr,
>>
>> I just stumbled across this (slightly older) mail. Your example on pastebin is not available anymore but I’m guessing you have roughly these two topologies:
>>
>> 1.
>>
>> Source1 -> Map1 -> ExtractTimestamps -|
>>  | ->  Map3 …
>> Source2 -> Map2 -> ExtractTimestamps -|
>>
>> The union is not visible at the graph level, it’s implicit in the combination of the two input streams.
>>
>> 2.
>>
>> Source1 -> Map1 -|
>>      | -> ExtractTimestamps -> Map3 …
>> Source2 -> Map2 -|
>>
>> The union is not visible at the graph level, it’s implicit in the combination of the two input streams.
>>
>> I’m also guessing that you have a timestamp/watermark assigner where the watermark is the highest-seen timestamp minus some lateness bound. I think the behaviour is not necessarily an artefact of the Flink implementation (with maps and extractors being fused together) but results from the graph itself and how watermarks are defined and how the extractor works: in the first case, each stream (before the union) has its own watermark and the watermark at Map3 is the minimum over those watermarks. This explains why a lower watermark on the one stream holds back the watermark in total at Map3. In the second case, the two streams are unioned together before extracting a timestamp/watermark and the choice of timestamp extractor (which takes the highest-seen timestamp) means that the watermark now advances “faster” because there is logically not a slower, separate stream anymore.
>>
>> Is that analysis correct? Does my description roughly make sense?
>>
>> Best,
>> Aljoscha
>>
>>> On 6. May 2017, at 15:00, Petr Novotnik <[hidden email]> wrote:
>>>
>>> Hello Flinkers,
>>>
>>> Given this small example program:
>>>
>>>> https://pastebin.com/30JbbgpH
>>>
>>> I'd expect the output:
>>>
>>>> one|three
>>>> two|four
>>>
>>> However, I consistently receive ...
>>>
>>>> one
>>>> two|four
>>>
>>> ... due to "three" being considered a late-comer which then gets
>>> discarded. When I remove `assignTimestampsAndWatermarks` after the
>>> `union` and place it separately on each of the union's inputs, i.e.
>>> before the `union`, I get what I expect.
>>>
>>> Now, after digging through Flink's source code, this behavior actually
>>> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
>>> operators form one task). Though, from a user/api perspective, it is at
>>> least surprising.
>>>
>>> I wanted to ask whether kind of behavior is known, indented or maybe
>>> something to be improved to avoid the gotcha?
>>>
>>> Many thanks in advance,
>>> Pete.
>>>
>>
>