Assigning timestamps and watermarks several times, several datastreams?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Assigning timestamps and watermarks several times, several datastreams?

Aakarsh Madhavan
Hi!

Currently I am using Flink 1.4.2. 

class TSWM implements AssignerWithPunctuatedWatermarks<POJO> {
   long maxTS = Long.MIN_VALUE;
   @Override
  public Watermark checkAndGetNextWatermark(POJO event, long l) {
    maxTS = Math.max(maxTS, event.TS);
    return new Watermark(getMaxTimestamp());
  }

  @Override
  public long extractTimestamp(POJO event, long l) {
    maxTS = Math.max(maxTS, event.TS);
    return event.TS;
  }
}

DataStream<POJO> ds1 = ... .assignTimestampsAndWatermarks(new TSWM())

DataStream<POJO> ds2 = ... .assignTimestampsAndWatermarks(new TSWM())

Suppose I ran this code above, what I am confused about is the overall watermarking system.

Now I want to do the following:

ds1.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);

ds2.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);

The main doubt I am having is how this works with the watermarks. Does `ds1` and `ds2` have separate watermarks that don't concern each other? Ie do they operate separately?

I am just not sure how the window trigger would work for example or how the watermarks would advance. Do they watermarks reset and advance for each stream separately so no data is lost?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Assigning timestamps and watermarks several times, several datastreams?

Fabian Hueske-2
Hi,

Watermarks of streams are independent as long as the streams are not connected with each other.
When you union, join, or connect two streams in any other way, their watermarks are fused, which means that they are synced to the "slower" stream, i.e., the stream with the earlier watermarks.

Best, Fabian

Am Di., 19. Feb. 2019 um 23:34 Uhr schrieb Aakarsh Madhavan <[hidden email]>:
Hi!

Currently I am using Flink 1.4.2. 

class TSWM implements AssignerWithPunctuatedWatermarks<POJO> {
   long maxTS = Long.MIN_VALUE;
   @Override
  public Watermark checkAndGetNextWatermark(POJO event, long l) {
    maxTS = Math.max(maxTS, event.TS);
    return new Watermark(getMaxTimestamp());
  }

  @Override
  public long extractTimestamp(POJO event, long l) {
    maxTS = Math.max(maxTS, event.TS);
    return event.TS;
  }
}

DataStream<POJO> ds1 = ... .assignTimestampsAndWatermarks(new TSWM())

DataStream<POJO> ds2 = ... .assignTimestampsAndWatermarks(new TSWM())

Suppose I ran this code above, what I am confused about is the overall watermarking system.

Now I want to do the following:

ds1.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);

ds2.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);

The main doubt I am having is how this works with the watermarks. Does `ds1` and `ds2` have separate watermarks that don't concern each other? Ie do they operate separately?

I am just not sure how the window trigger would work for example or how the watermarks would advance. Do they watermarks reset and advance for each stream separately so no data is lost?

Thanks!