Timestamp Watermark Assigner bpund question

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamp Watermark Assigner bpund question

Vijay Balakrishnan
Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound ??

public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
private long bound = 5 * (long) 1000; 
public long extractTimestamp(Monitoring monitoring, long previousTS) {
        return monitoring.getEventTimestamp();
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        return new Watermark(extractedTimestamp + bound);//<==== should it be - bound ?
    }
}

Used here:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(....);
DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringTSWAssigner());
KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", .....);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window

TIA, 
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Watermark Assigner bpund question

Guowei Ma
Hi,
1. From doc[1], A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark). So I think it might be counterintuitive that generating a watermark, which is bigger than the timestamp of current element. At least you should minus the bound.
2. From the definition of watermark I think that watermark is not related with the length of window. The bound is related to your application.
3. In your case AssignerWithPunctuatedWatermarks might not be a good choice. Watermark is not free, you might send too many watermarks. If your source could generate some "watermark" element I think you could use the interface. You could choose AssignerWithPeriodicWatermarks. You can find the example from doc[2].

Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 上午7:41写道:
Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound ??

public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
private long bound = 5 * (long) 1000; 
public long extractTimestamp(Monitoring monitoring, long previousTS) {
        return monitoring.getEventTimestamp();
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        return new Watermark(extractedTimestamp + bound);//<==== should it be - bound ?
    }
}

Used here:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(....);
DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringTSWAssigner());
KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", .....);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window

TIA, 
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Watermark Assigner bpund question

Vijay Balakrishnan
Hi Guowei,
Thx for your reply.
I am trying to understand the logic behind the Point 1 i.e current Watermark being currMaxTimestamp minus the bound.
So, does this mean the Operator processing a task has a current Event time < current Watermark < currMaxTimestamp ??? Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ?
Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;

How does it guarantee that watermark never goes backwards ?
TIA,
Vijay


On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <[hidden email]> wrote:
Hi,
1. From doc[1], A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark). So I think it might be counterintuitive that generating a watermark, which is bigger than the timestamp of current element. At least you should minus the bound.
2. From the definition of watermark I think that watermark is not related with the length of window. The bound is related to your application.
3. In your case AssignerWithPunctuatedWatermarks might not be a good choice. Watermark is not free, you might send too many watermarks. If your source could generate some "watermark" element I think you could use the interface. You could choose AssignerWithPeriodicWatermarks. You can find the example from doc[2].

Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 上午7:41写道:
Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound ??

public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
private long bound = 5 * (long) 1000; 
public long extractTimestamp(Monitoring monitoring, long previousTS) {
        return monitoring.getEventTimestamp();
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        return new Watermark(extractedTimestamp + bound);//<==== should it be - bound ?
    }
}

Used here:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(....);
DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringTSWAssigner());
KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", .....);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window

TIA, 
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Watermark Assigner bpund question

Guowei Ma
Hi, Vijay

>>>Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ?
AFAIK, the operator that generates watermark is called by the frame work. When the operator is called depends on the operator itself. For example the operator that implements the AssignerWithPunctuatedWatermarks interface would be called for every element. 

>>>How does it guarantee that watermark never goes backwards ?
Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would send it to the downstream. 

Best,
Guowei


Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 下午11:44写道:
Hi Guowei,
Thx for your reply.
I am trying to understand the logic behind the Point 1 i.e current Watermark being currMaxTimestamp minus the bound.
So, does this mean the Operator processing a task has a current Event time < current Watermark < currMaxTimestamp ??? Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ?
Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;

How does it guarantee that watermark never goes backwards ?
TIA,
Vijay


On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <[hidden email]> wrote:
Hi,
1. From doc[1], A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark). So I think it might be counterintuitive that generating a watermark, which is bigger than the timestamp of current element. At least you should minus the bound.
2. From the definition of watermark I think that watermark is not related with the length of window. The bound is related to your application.
3. In your case AssignerWithPunctuatedWatermarks might not be a good choice. Watermark is not free, you might send too many watermarks. If your source could generate some "watermark" element I think you could use the interface. You could choose AssignerWithPeriodicWatermarks. You can find the example from doc[2].

Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 上午7:41写道:
Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound ??

public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
private long bound = 5 * (long) 1000; 
public long extractTimestamp(Monitoring monitoring, long previousTS) {
        return monitoring.getEventTimestamp();
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        return new Watermark(extractedTimestamp + bound);//<==== should it be - bound ?
    }
}

Used here:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(....);
DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringTSWAssigner());
KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", .....);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window

TIA, 
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Watermark Assigner bpund question

Guowei Ma
sorry for missing a not. :(
Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would _not_ send it to the downstream. 


Best,
Guowei


Guowei Ma <[hidden email]> 于2019年4月15日周一 上午9:44写道:
Hi, Vijay

>>>Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ?
AFAIK, the operator that generates watermark is called by the frame work. When the operator is called depends on the operator itself. For example the operator that implements the AssignerWithPunctuatedWatermarks interface would be called for every element. 

>>>How does it guarantee that watermark never goes backwards ?
Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would send it to the downstream. 

Best,
Guowei


Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 下午11:44写道:
Hi Guowei,
Thx for your reply.
I am trying to understand the logic behind the Point 1 i.e current Watermark being currMaxTimestamp minus the bound.
So, does this mean the Operator processing a task has a current Event time < current Watermark < currMaxTimestamp ??? Then the Operator progresses to the next Watermark as a starting point for events after event time reaches currWatermark ?
Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;

How does it guarantee that watermark never goes backwards ?
TIA,
Vijay


On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <[hidden email]> wrote:
Hi,
1. From doc[1], A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark). So I think it might be counterintuitive that generating a watermark, which is bigger than the timestamp of current element. At least you should minus the bound.
2. From the definition of watermark I think that watermark is not related with the length of window. The bound is related to your application.
3. In your case AssignerWithPunctuatedWatermarks might not be a good choice. Watermark is not free, you might send too many watermarks. If your source could generate some "watermark" element I think you could use the interface. You could choose AssignerWithPeriodicWatermarks. You can find the example from doc[2].

Vijay Balakrishnan <[hidden email]> 于2019年4月10日周三 上午7:41写道:
Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound ??

public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> {
private long bound = 5 * (long) 1000; 
public long extractTimestamp(Monitoring monitoring, long previousTS) {
        return monitoring.getEventTimestamp();
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) {
        return new Watermark(extractedTimestamp + bound);//<==== should it be - bound ?
    }
}

Used here:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(....);
DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new MonitoringTSWAssigner());
KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", .....);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window

TIA,