Proctime consistency

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Proctime consistency

Rex Fenley
Hello,

I'm looking at ways to deduplicate data and found [1], but does proctime get committed with operators? How does this work against clock skew on different machines?

Thanks


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Proctime consistency

Timo Walther
Hi Rex,

processing-time gives you no alignment of operators across nodes. Each
operation works with its local machine clock that might be interrupted
by the OS, Java garbage collector, etc. It is always a best effort timing.

Regards,
Timo


On 27.01.21 18:16, Rex Fenley wrote:

> Hello,
>
> I'm looking at ways to deduplicate data and found [1], but does proctime
> get committed with operators? How does this work against clock skew on
> different machines?
>
> Thanks
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>

Reply | Threaded
Open this post in threaded view
|

Re: Proctime consistency

Rex Fenley
We need to aggregate in precisely row order. Is there a safe way to do this? Maybe with some sort of row time sequence number?

As written in another email, we're currently doing the following set of operations
val compactedUserDocsStream = userDocsStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(new CompactionAggregate())

I guess my concern is if we restore from a checkpoint or savepoint I don't understand how the window get's checkpointed and how window alignment works between runs of a job. Will the window just start over from scratch, and re-process any rows that may have been inflight but not finished processing in the previous run's last window?

If so then I guess everything will arrive in row order like we want it to. But if a window get's checkpointed with its previous proctime, then it may be misaligned in the next run and drop rows that were in that window.

On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <[hidden email]> wrote:
Hi Rex,

processing-time gives you no alignment of operators across nodes. Each
operation works with its local machine clock that might be interrupted
by the OS, Java garbage collector, etc. It is always a best effort timing.

Regards,
Timo


On 27.01.21 18:16, Rex Fenley wrote:
> Hello,
>
> I'm looking at ways to deduplicate data and found [1], but does proctime
> get committed with operators? How does this work against clock skew on
> different machines?
>
> Thanks
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



Reply | Threaded
Open this post in threaded view
|

Re: Proctime consistency

Timo Walther
As far as I know, we support ROW_NUMBER in SQL that could give you
sequence number.

Regarding window semantics, the processing time only determines when to
trigger the evaluation (also mentioned here [1]). A timer is registered
for the next evaluation. The window content and next timer is part of
every checkpoint and savepoint. If you restore from a
checkpoint/savepoint, the stored next timestamp will be checked with the
current wall clock and an evaluation might be triggered immediately.
Thus, usually event-time is more useful than processing time. If you
have a lot of processing time timers set, they might all fire
immediately during a restore.

So the window will not start over from scratch. But inflight data that
was about to reach the window operator will be reread from the source
operator.

Timo


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers


On 01.02.21 20:06, Rex Fenley wrote:

> We need to aggregate in precisely row order. Is there a safe way to do
> this? Maybe with some sort of row time sequence number?
>
> As written in another email, we're currently doing the following set of
> operations
> valcompactedUserDocsStream = userDocsStream
> .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> .aggregate(newCompactionAggregate())
>
> I guess my concern is if we restore from a checkpoint or savepoint I
> don't understand how the window get's checkpointed and how window
> alignment works between runs of a job. Will the window just start over
> from scratch, and re-process any rows that may have been inflight but
> not finished processing in the previous run's last window?
>
> If so then I guess everything will arrive in row order like we want it
> to. But if a window get's checkpointed with its previous proctime, then
> it may be misaligned in the next run and drop rows that were in that window.
>
> On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Rex,
>
>     processing-time gives you no alignment of operators across nodes. Each
>     operation works with its local machine clock that might be interrupted
>     by the OS, Java garbage collector, etc. It is always a best effort
>     timing.
>
>     Regards,
>     Timo
>
>
>     On 27.01.21 18:16, Rex Fenley wrote:
>      > Hello,
>      >
>      > I'm looking at ways to deduplicate data and found [1], but does
>     proctime
>      > get committed with operators? How does this work against clock
>     skew on
>      > different machines?
>      >
>      > Thanks
>      >
>      > [1]
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>>
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
>     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
>      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>> | LIKE US
>      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>
>      >
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Proctime consistency

Rex Fenley
So if I'm reading this correctly, on checkpoint restore, if current machine time / proc time > checkpointed window proc time, the window will fire immediately with all the data it had aggregated. If current machine time < window proc time, the window will just continue where it left off until it hits the machine's clock time where it is meant to trigger.

That actually also seems perfectly fine for our use case. I see the concern if there are a lot of proc time windows building up how a lot of triggers firing could stress resources on a restore, but I don't think that will matter for our case, we just want to make sure we don't lose any data or have any gaps between windows.

Please confirm if I got this right, and thank you much for your reply!

On Tue, Feb 2, 2021 at 3:17 AM Timo Walther <[hidden email]> wrote:
As far as I know, we support ROW_NUMBER in SQL that could give you
sequence number.

Regarding window semantics, the processing time only determines when to
trigger the evaluation (also mentioned here [1]). A timer is registered
for the next evaluation. The window content and next timer is part of
every checkpoint and savepoint. If you restore from a
checkpoint/savepoint, the stored next timestamp will be checked with the
current wall clock and an evaluation might be triggered immediately.
Thus, usually event-time is more useful than processing time. If you
have a lot of processing time timers set, they might all fire
immediately during a restore.

So the window will not start over from scratch. But inflight data that
was about to reach the window operator will be reread from the source
operator.

Timo


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers


On 01.02.21 20:06, Rex Fenley wrote:
> We need to aggregate in precisely row order. Is there a safe way to do
> this? Maybe with some sort of row time sequence number?
>
> As written in another email, we're currently doing the following set of
> operations
> valcompactedUserDocsStream = userDocsStream
> .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> .aggregate(newCompactionAggregate())
>
> I guess my concern is if we restore from a checkpoint or savepoint I
> don't understand how the window get's checkpointed and how window
> alignment works between runs of a job. Will the window just start over
> from scratch, and re-process any rows that may have been inflight but
> not finished processing in the previous run's last window?
>
> If so then I guess everything will arrive in row order like we want it
> to. But if a window get's checkpointed with its previous proctime, then
> it may be misaligned in the next run and drop rows that were in that window.
>
> On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Rex,
>
>     processing-time gives you no alignment of operators across nodes. Each
>     operation works with its local machine clock that might be interrupted
>     by the OS, Java garbage collector, etc. It is always a best effort
>     timing.
>
>     Regards,
>     Timo
>
>
>     On 27.01.21 18:16, Rex Fenley wrote:
>      > Hello,
>      >
>      > I'm looking at ways to deduplicate data and found [1], but does
>     proctime
>      > get committed with operators? How does this work against clock
>     skew on
>      > different machines?
>      >
>      > Thanks
>      >
>      > [1]
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>>
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
>     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
>      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>> | LIKE US
>      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>
>      >
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>
>
>



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Proctime consistency

Timo Walther
Hi Rex,

sorry for replying so late. Yes, your summary should be correct.

In many cases this processing time stress on restore is the reason why
people select event time eventually. But if that is fine for your use
case, that's great.

Regards,
Timo

On 05.02.21 06:26, Rex Fenley wrote:

> So if I'm reading this correctly, on checkpoint restore, if current
> machine time / proc time > checkpointed window proc time, the window
> will fire immediately with all the data it had aggregated. If current
> machine time < window proc time, the window will just continue where it
> left off until it hits the machine's clock time where it is meant to
> trigger.
>
> That actually also seems perfectly fine for our use case. I see the
> concern if there are a lot of proc time windows building up how a lot of
> triggers firing could stress resources on a restore, but I don't think
> that will matter for our case, we just want to make sure we don't lose
> any data or have any gaps between windows.
>
> Please confirm if I got this right, and thank you much for your reply!
>
> On Tue, Feb 2, 2021 at 3:17 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     As far as I know, we support ROW_NUMBER in SQL that could give you
>     sequence number.
>
>     Regarding window semantics, the processing time only determines when to
>     trigger the evaluation (also mentioned here [1]). A timer is registered
>     for the next evaluation. The window content and next timer is part of
>     every checkpoint and savepoint. If you restore from a
>     checkpoint/savepoint, the stored next timestamp will be checked with
>     the
>     current wall clock and an evaluation might be triggered immediately.
>     Thus, usually event-time is more useful than processing time. If you
>     have a lot of processing time timers set, they might all fire
>     immediately during a restore.
>
>     So the window will not start over from scratch. But inflight data that
>     was about to reach the window operator will be reread from the source
>     operator.
>
>     Timo
>
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers
>     <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers>
>
>
>     On 01.02.21 20:06, Rex Fenley wrote:
>      > We need to aggregate in precisely row order. Is there a safe way
>     to do
>      > this? Maybe with some sort of row time sequence number?
>      >
>      > As written in another email, we're currently doing the following
>     set of
>      > operations
>      > valcompactedUserDocsStream = userDocsStream
>      > .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
>      > .aggregate(newCompactionAggregate())
>      >
>      > I guess my concern is if we restore from a checkpoint or savepoint I
>      > don't understand how the window get's checkpointed and how window
>      > alignment works between runs of a job. Will the window just start
>     over
>      > from scratch, and re-process any rows that may have been inflight
>     but
>      > not finished processing in the previous run's last window?
>      >
>      > If so then I guess everything will arrive in row order like we
>     want it
>      > to. But if a window get's checkpointed with its previous
>     proctime, then
>      > it may be misaligned in the next run and drop rows that were in
>     that window.
>      >
>      > On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     Hi Rex,
>      >
>      >     processing-time gives you no alignment of operators across
>     nodes. Each
>      >     operation works with its local machine clock that might be
>     interrupted
>      >     by the OS, Java garbage collector, etc. It is always a best
>     effort
>      >     timing.
>      >
>      >     Regards,
>      >     Timo
>      >
>      >
>      >     On 27.01.21 18:16, Rex Fenley wrote:
>      >      > Hello,
>      >      >
>      >      > I'm looking at ways to deduplicate data and found [1], but
>     does
>      >     proctime
>      >      > get committed with operators? How does this work against clock
>      >     skew on
>      >      > different machines?
>      >      >
>      >      > Thanks
>      >      >
>      >      > [1]
>      >      >
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>      >  
>       <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>>
>      >
>      >      >
>      >  
>       <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>
>      >  
>       <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication>>>
>      >      >
>      >      > --
>      >      >
>      >      > Rex Fenley|Software Engineer - Mobile and Backend
>      >      >
>      >      >
>      >      > Remind.com <https://www.remind.com/
>     <https://www.remind.com/> <https://www.remind.com/
>     <https://www.remind.com/>>>|
>      >     BLOG <http://blog.remind.com/ <http://blog.remind.com/>
>     <http://blog.remind.com/ <http://blog.remind.com/>>> |
>      >      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>
>      >     <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>>> | LIKE US
>      >      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>
>      >     <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>>
>      >      >
>      >
>      >
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
>     BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
>      > FOLLOW US <https://twitter.com/remindhq
>     <https://twitter.com/remindhq>> | LIKE US
>      > <https://www.facebook.com/remindhq
>     <https://www.facebook.com/remindhq>>
>      >
>      >
>      >
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>