How can I drop events which are late by more than X hours/days?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How can I drop events which are late by more than X hours/days?

orips
I need to drop elements which are delayed by more than a certain amount of time from the current watermark.

I wanted to create a FilterFunction where I get the current watermark, and if the difference between the watermark and my element's timestamp is greater than X - drop the element.

However, I do not have access to the current watermark inside any of Flink's operators/functions including FilterFunction.

How can such functionality be achieved?
Reply | Threaded
Open this post in threaded view
|

Re: How can I drop events which are late by more than X hours/days?

Matthias
Hi Ori,
one way to do it is to implement a basic ProcessFunction. ProcessFunction.processElement(I value, Context ctx, Collector<O> out) offers access to the context through which you can access the current watermark timestamp using ctx.timerService().currentWatermark(). That you can use to filter out delayed events.

Best,
Matthias

On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski <[hidden email]> wrote:
I need to drop elements which are delayed by more than a certain amount of time from the current watermark.

I wanted to create a FilterFunction where I get the current watermark, and if the difference between the watermark and my element's timestamp is greater than X - drop the element.

However, I do not have access to the current watermark inside any of Flink's operators/functions including FilterFunction.

How can such functionality be achieved?


--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: How can I drop events which are late by more than X hours/days?

Arvid Heise-3
Hi Ori,

if you use windows, Flink has already a solution on board with allowed lateness [1].

By default, Flink filters all late records (records timestamp < current watermark). You can add the time X and still allow these elements to be processed.

If you end up treating all late events as normal events and you don't fire when the watermark comes (e.g., you want to avoid late firings), you probably just want to adjust your watermark strategy to reflect X and keep allowed lateness to 0. That is, if you current watermark strategy is watermark=max(record timestamp), you can just go with watermark=max(record timestamp)-X.


On Thu, Sep 24, 2020 at 12:14 PM Matthias Pohl <[hidden email]> wrote:
Hi Ori,
one way to do it is to implement a basic ProcessFunction. ProcessFunction.processElement(I value, Context ctx, Collector<O> out) offers access to the context through which you can access the current watermark timestamp using ctx.timerService().currentWatermark(). That you can use to filter out delayed events.

Best,
Matthias

On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski <[hidden email]> wrote:
I need to drop elements which are delayed by more than a certain amount of time from the current watermark.

I wanted to create a FilterFunction where I get the current watermark, and if the difference between the watermark and my element's timestamp is greater than X - drop the element.

However, I do not have access to the current watermark inside any of Flink's operators/functions including FilterFunction.

How can such functionality be achieved?


--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: How can I drop events which are late by more than X hours/days?

Theo
Hi Arvid,

be aware that allowedLateness will only be applied when your job has some windowing in use. If you have late events and you only apply mapFunctions like enrichment, as far as I know, the event's won't be filtered out automatically .

Best regards
Theo


Von: "Arvid Heise" <[hidden email]>
An: "Matthias Pohl" <[hidden email]>
CC: "Ori Popowski" <[hidden email]>, "user" <[hidden email]>
Gesendet: Freitag, 25. September 2020 07:58:40
Betreff: Re: How can I drop events which are late by more than X hours/days?

Hi Ori,

if you use windows, Flink has already a solution on board with allowed lateness [1].

By default, Flink filters all late records (records timestamp < current watermark). You can add the time X and still allow these elements to be processed.

If you end up treating all late events as normal events and you don't fire when the watermark comes (e.g., you want to avoid late firings), you probably just want to adjust your watermark strategy to reflect X and keep allowed lateness to 0. That is, if you current watermark strategy is watermark=max(record timestamp), you can just go with watermark=max(record timestamp)-X.


On Thu, Sep 24, 2020 at 12:14 PM Matthias Pohl <[hidden email]> wrote:
Hi Ori,
one way to do it is to implement a basic ProcessFunction. ProcessFunction.processElement(I value, Context ctx, Collector<O> out) offers access to the context through which you can access the current watermark timestamp using ctx.timerService().currentWatermark(). That you can use to filter out delayed events.

Best,
Matthias

On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski <[hidden email]> wrote:
I need to drop elements which are delayed by more than a certain amount of time from the current watermark.

I wanted to create a FilterFunction where I get the current watermark, and if the difference between the watermark and my element's timestamp is greater than X - drop the element.

However, I do not have access to the current watermark inside any of Flink's operators/functions including FilterFunction.

How can such functionality be achieved?


--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner


--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: How can I drop events which are late by more than X hours/days?

Arvid Heise-3
Apparently, I haven't made that clear enough in my first mail, so thanks for clarifying that Theo.

As Matthias wrote, the general solution is to use (Keyed)ProcessFunction [1]. However, if OP uses watermarks, chances are high that OP uses them for windows, so I wanted to point out the intended way.


On Fri, Sep 25, 2020 at 9:27 AM Theo Diefenthal <[hidden email]> wrote:
Hi Arvid,

be aware that allowedLateness will only be applied when your job has some windowing in use. If you have late events and you only apply mapFunctions like enrichment, as far as I know, the event's won't be filtered out automatically .

Best regards
Theo


Von: "Arvid Heise" <[hidden email]>
An: "Matthias Pohl" <[hidden email]>
CC: "Ori Popowski" <[hidden email]>, "user" <[hidden email]>
Gesendet: Freitag, 25. September 2020 07:58:40
Betreff: Re: How can I drop events which are late by more than X hours/days?

Hi Ori,

if you use windows, Flink has already a solution on board with allowed lateness [1].

By default, Flink filters all late records (records timestamp < current watermark). You can add the time X and still allow these elements to be processed.

If you end up treating all late events as normal events and you don't fire when the watermark comes (e.g., you want to avoid late firings), you probably just want to adjust your watermark strategy to reflect X and keep allowed lateness to 0. That is, if you current watermark strategy is watermark=max(record timestamp), you can just go with watermark=max(record timestamp)-X.


On Thu, Sep 24, 2020 at 12:14 PM Matthias Pohl <[hidden email]> wrote:
Hi Ori,
one way to do it is to implement a basic ProcessFunction. ProcessFunction.processElement(I value, Context ctx, Collector<O> out) offers access to the context through which you can access the current watermark timestamp using ctx.timerService().currentWatermark(). That you can use to filter out delayed events.

Best,
Matthias

On Thu, Sep 24, 2020 at 9:59 AM Ori Popowski <[hidden email]> wrote:
I need to drop elements which are delayed by more than a certain amount of time from the current watermark.

I wanted to create a FilterFunction where I get the current watermark, and if the difference between the watermark and my element's timestamp is greater than X - drop the element.

However, I do not have access to the current watermark inside any of Flink's operators/functions including FilterFunction.

How can such functionality be achieved?


--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner


--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng