Flink wrong Watermark in Periodic watermark

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink wrong Watermark in Periodic watermark

Soheil Pourbafrani
Using Flink EventTime feature, I implement the class AssignerWithPeriodicWatermark such that:

public static class SampleTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, JSONObject>> {
private static final long serialVersionUID = 1L;
private long MAX_TIMESTAMP;
private final long DELEY = 3000;


@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> t, long l) {
long timestamp = t.f1 ;
MAX_TIMESTAMP = Math.max(timestamp , MAX_TIMESTAMP);
System.out.println("Max TimeStamp : " + MAX_TIMESTAMP);
return timestamp ;
}

@Nullable
@Override
public Watermark getCurrentWatermark() {
System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY));
return new Watermark(MAX_TIMESTAMP - DELEY);
}
}
In addition, I set the watermark interval to 100 milliseconds:
env.getConfig().setAutoWatermarkInterval(100);
But when I check the logs, some watermarks are -3000, so in getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 = -3000), while I can see in the logs that the MAX_TIMESTAMP has a value greater than zero!
Here is a part of the output:
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243168
Max TimeStamp : 1532934243168
Current WatreMark : 1532934240168
Current WatreMark : -3000
Current WatreMark : -3000
Current WatreMark : 1532934240168
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243184
Max TimeStamp : 1532934243200
Max TimeStamp : 1532934243208
Max TimeStamp : 1532934243184


Reply | Threaded
Open this post in threaded view
|

Re: Flink wrong Watermark in Periodic watermark

Xingcan Cui
HI Soheil,

That may relate to your parallelism since each extractor instance compute its own watermarks. Try to print the max timestamps with the current thread’s name and you will notice this.

Best,
Xingcan

On Jul 30, 2018, at 3:05 PM, Soheil Pourbafrani <[hidden email]> wrote:

Using Flink EventTime feature, I implement the class AssignerWithPeriodicWatermark such that:

public static class SampleTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, JSONObject>> {
private static final long serialVersionUID = 1L;
private long MAX_TIMESTAMP;
private final long DELEY = 3000;


@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> t, long l) {
long timestamp = t.f1 ;
MAX_TIMESTAMP = Math.max(timestamp , MAX_TIMESTAMP);
System.out.println("Max TimeStamp : " + MAX_TIMESTAMP);
return timestamp ;
}

@Nullable
@Override
public Watermark getCurrentWatermark() {
System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY));
return new Watermark(MAX_TIMESTAMP - DELEY);
}
}
In addition, I set the watermark interval to 100 milliseconds:
env.getConfig().setAutoWatermarkInterval(100);
But when I check the logs, some watermarks are -3000, so in getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 = -3000), while I can see in the logs that the MAX_TIMESTAMP has a value greater than zero!
Here is a part of the output:
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243168
Max TimeStamp : 1532934243168
Current WatreMark : 1532934240168
Current WatreMark : -3000
Current WatreMark : -3000
Current WatreMark : 1532934240168
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243184
Max TimeStamp : 1532934243200
Max TimeStamp : 1532934243208
Max TimeStamp : 1532934243184