Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

Jayant Ameta
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};

2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant
Reply | Threaded
Open this post in threaded view
|

Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

Gary Yao-2
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
all previous events. That is, if you do not receive new events, the Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
clock.

Maybe this will already help you to debug your application. If not, it would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <[hidden email]> wrote:
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};

2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant

Reply | Threaded
Open this post in threaded view
|

Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

Jayant Ameta
Thanks Gary,
I was only trying with a fixed set of events, so the Watermark was not advancing, like you said.


Jayant Ameta

On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <[hidden email]> wrote:
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
all previous events. That is, if you do not receive new events, the Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
clock.

Maybe this will already help you to debug your application. If not, it would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <[hidden email]> wrote:
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};

2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant


Reply | Threaded
Open this post in threaded view
|

Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

Fabian Hueske-2
Another thing to point out is that watermarks are usually data-driven, i.e., they depend on the timestamps of the events and not on the clock of the machine.
Otherwise, you might observe a lot of late data, i.e., events with timestamps smaller than the last watermark.

If you assign timestamps and watermarks based on the clock of the machine, you might also use ingestion time instead of event time.

2018-01-11 11:49 GMT+01:00 Jayant Ameta <[hidden email]>:
Thanks Gary,
I was only trying with a fixed set of events, so the Watermark was not advancing, like you said.


Jayant Ameta

On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <[hidden email]> wrote:
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
all previous events. That is, if you do not receive new events, the Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
clock.

Maybe this will already help you to debug your application. If not, it would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <[hidden email]> wrote:
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};

2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant



Reply | Threaded
Open this post in threaded view
|

Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

Jayant Ameta
Hi Fabian,
I want to extract timestamps from my event. However, the events stream can be sparse at times (e.g. 2 days without any events).
What's the best strategy to create watermarks if I want real-time processing of the events which enter the stream?

Jayant Ameta

On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <[hidden email]> wrote:
Another thing to point out is that watermarks are usually data-driven, i.e., they depend on the timestamps of the events and not on the clock of the machine.
Otherwise, you might observe a lot of late data, i.e., events with timestamps smaller than the last watermark.

If you assign timestamps and watermarks based on the clock of the machine, you might also use ingestion time instead of event time.

2018-01-11 11:49 GMT+01:00 Jayant Ameta <[hidden email]>:
Thanks Gary,
I was only trying with a fixed set of events, so the Watermark was not advancing, like you said.


Jayant Ameta

On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <[hidden email]> wrote:
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
all previous events. That is, if you do not receive new events, the Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
clock.

Maybe this will already help you to debug your application. If not, it would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <[hidden email]> wrote:
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};

2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant




Reply | Threaded
Open this post in threaded view
|

Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

Fabian Hueske-2
This depends on the requirements of your application.
Using the usual watermark generation strategies which are purely data driven, a stream that does not produce data would not advance its watermarks.
Not advancing the watermarks means that the program cannot make progress.

This might also be fine if your program consumes a single stream because if this stream does not produce data, your program also doesn't have anything to compute (there might be still data left. such as a window, that is not computed).
The situation becomes more tricky, if your program has multiple sources that become inactive at some point or a source where a partition can become inactive.

AFAIK, there is a mechanism to mark partitions (and maybe complete sources) as inactive.
@Gordon (in CC) knows more about this feature.

Best, Fabian

2018-01-15 14:51 GMT+01:00 Jayant Ameta <[hidden email]>:
Hi Fabian,
I want to extract timestamps from my event. However, the events stream can be sparse at times (e.g. 2 days without any events).
What's the best strategy to create watermarks if I want real-time processing of the events which enter the stream?

Jayant Ameta

On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <[hidden email]> wrote:
Another thing to point out is that watermarks are usually data-driven, i.e., they depend on the timestamps of the events and not on the clock of the machine.
Otherwise, you might observe a lot of late data, i.e., events with timestamps smaller than the last watermark.

If you assign timestamps and watermarks based on the clock of the machine, you might also use ingestion time instead of event time.

2018-01-11 11:49 GMT+01:00 Jayant Ameta <[hidden email]>:
Thanks Gary,
I was only trying with a fixed set of events, so the Watermark was not advancing, like you said.


Jayant Ameta

On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <[hidden email]> wrote:
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
all previous events. That is, if you do not receive new events, the Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
clock.

Maybe this will already help you to debug your application. If not, it would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <[hidden email]> wrote:
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};

2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant





Reply | Threaded
Open this post in threaded view
|

Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

Aljoscha Krettek
A while back I wrote this slightly more elaborate extractor that will advance the watermark independently after the stream is idle for a while: https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java

Best,
Aljoscha

On 16. Jan 2018, at 10:29, Fabian Hueske <[hidden email]> wrote:

This depends on the requirements of your application.
Using the usual watermark generation strategies which are purely data driven, a stream that does not produce data would not advance its watermarks.
Not advancing the watermarks means that the program cannot make progress.

This might also be fine if your program consumes a single stream because if this stream does not produce data, your program also doesn't have anything to compute (there might be still data left. such as a window, that is not computed).
The situation becomes more tricky, if your program has multiple sources that become inactive at some point or a source where a partition can become inactive.

AFAIK, there is a mechanism to mark partitions (and maybe complete sources) as inactive.
@Gordon (in CC) knows more about this feature.

Best, Fabian

2018-01-15 14:51 GMT+01:00 Jayant Ameta <[hidden email]>:
Hi Fabian,
I want to extract timestamps from my event. However, the events stream can be sparse at times (e.g. 2 days without any events).
What's the best strategy to create watermarks if I want real-time processing of the events which enter the stream?

Jayant Ameta

On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <[hidden email]> wrote:
Another thing to point out is that watermarks are usually data-driven, i.e., they depend on the timestamps of the events and not on the clock of the machine.
Otherwise, you might observe a lot of late data, i.e., events with timestamps smaller than the last watermark.

If you assign timestamps and watermarks based on the clock of the machine, you might also use ingestion time instead of event time.

2018-01-11 11:49 GMT+01:00 Jayant Ameta <[hidden email]>:
Thanks Gary,
I was only trying with a fixed set of events, so the Watermark was not advancing, like you said.


Jayant Ameta

On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <[hidden email]> wrote:
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of
all previous events. That is, if you do not receive new events, the Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the wall
clock.

Maybe this will already help you to debug your application. If not, it would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <[hidden email]> wrote:
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not firing. However, the trigger fires when using custom timestamp extractor with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};

2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant