Watermarks in Event Time Temporal Join

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermarks in Event Time Temporal Join

maverick
Hi,
I'm curious why Event Time Temporal Join needs watermarks from both sides to
perform join.

Shouldn't watermark on versioned table side be enough to perform join ?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks in Event Time Temporal Join

Shengkai Fang
Hi, maverick.

The watermark is used to determine the message is late or early. If we only use the watermark on versioned table side, we have no means to determine whether the event in the main stream is ready to emit.

Best,
Shengkai

maverick <[hidden email]> 于2021年4月26日周一 上午2:31写道:
Hi,
I'm curious why Event Time Temporal Join needs watermarks from both sides to
perform join.

Shouldn't watermark on versioned table side be enough to perform join ?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks in Event Time Temporal Join

maverick
Hi Shengkai,
Thanks for the answer. The question is do we need to determine if an
event in the main stream is late.
Let's look at interval join - event is emitted as soon as there is a
match between left and right stream.
I agree the watermark should pass on versioned table side, because
this is the only way to know which version of record should be used.
But if we mimics behaviour of interval join then main stream watermark
could be skipped.

Regards,
Maciek

pon., 26 kwi 2021 o 06:14 Shengkai Fang <[hidden email]> napisał(a):

>
> Hi, maverick.
>
> The watermark is used to determine the message is late or early. If we only use the watermark on versioned table side, we have no means to determine whether the event in the main stream is ready to emit.
>
> Best,
> Shengkai
>
> maverick <[hidden email]> 于2021年4月26日周一 上午2:31写道:
>>
>> Hi,
>> I'm curious why Event Time Temporal Join needs watermarks from both sides to
>> perform join.
>>
>> Shouldn't watermark on versioned table side be enough to perform join ?
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



--
Maciek Bryński
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks in Event Time Temporal Join

Leonard Xu
Hello, Maciej
I agree the watermark should pass on versioned table side, because
this is the only way to know which version of record should be used.
But if we mimics behaviour of interval join then main stream watermark
could be skipped.

IIRC, rowtime interval join requires the watermark on both sides, and the watermark 
will be used to clean up the outdated data and advance the data progress both in rowtime  interval join and rowtime temporal join.

Best,
Leonard

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks in Event Time Temporal Join

maverick
Hi Leonard,
Let's assume we have two streams.
S1 - id, value1, ts1 with watermark = ts1 - 1
S2 - id, value2, ts2 with watermark = ts2 - 1

Then we have following interval join
SELECT id, value1, value2, ts1 FROM S1 JOIN S2 ON S1.id = S2.id and
ts1 between ts2 - 1 and ts2

Let's have events.
stream, id, value, ts
S1, id1, v1, 1
S2, id1, v2, 1

For this events and internal join Flink will emit an event in the output stream:
id1, v1, v2, 1
Despite the fact the watermark for both streams is not reached.

Now similar situation for Event Time Temporal Join
SELECT id, value1, value2, ts1 FROM S1 JOIN S2 FOR SYSTEM_TIME AS OF
S1.ts1 ON S1.id = S2.id

Let's have events.
S1, id1, v1, 1
S2, id1, v2, 1

Nothing is happening as none of the streams have reached the watermark.
Now let's add
S2, id2, v2, 101
This should trigger join for id1 because we have all the knowledge to
perform this join (we know that the watermark for id1 record was
reached).
Unfortunately to trigger join on id1 we also need a watermark on S1
side and I think this behaviour is wrong.

I hope I explained everything correctly.

Regards,
Maciek

wt., 27 kwi 2021 o 08:58 Leonard Xu <[hidden email]> napisał(a):

>
> Hello, Maciej
>
> I agree the watermark should pass on versioned table side, because
> this is the only way to know which version of record should be used.
> But if we mimics behaviour of interval join then main stream watermark
> could be skipped.
>
>
> IIRC, rowtime interval join requires the watermark on both sides, and the watermark
> will be used to clean up the outdated data and advance the data progress both in rowtime  interval join and rowtime temporal join.
>
> Best,
> Leonard
>


--
Maciek Bryński
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks in Event Time Temporal Join

Leonard Xu
Thanks for your example, Maciej

I can explain more about the design.
Let's have events.
S1, id1, v1, 1
S2, id1, v2, 1

Nothing is happening as none of the streams have reached the watermark.
Now let's add
S2, id2, v2, 101
This should trigger join for id1 because we have all the knowledge to
perform this join (we know that the watermark for id1 record was
reached).

Base on this example, if the left stream events are
S1, id1, v1, 1
S1, id1, v2, 101
S1, id1, v3, 102
S1, id1, v4, 99 // assume the we need watermark as` ts - 3`
For out-of-order data like late event v4(99) should we ginored or not because the versioned table’s watermark is only base on versioned table’s data instead of both sides,
and it cannot represents the  of left stream, now we use left stream’s watermark for out-of-order data.
 
And continually base on your example, assume the right stream events are 
S2, id2, v1, 101
S2, id3, v1, 101
S2, id3, v2, 102
S2, id3, v3, 103
S2, id3, v4, 104
How we clean the old version data e.g. id3(v1~v3)? If you clean them only base on versioned table watermark(e.g. versioned table watermark is 105), the
 data (id3, 101),  (id3, 102) from  left stream data cannot find correct version, right?
Now the left stream watermark is used to clean up the outdated data and ensure the every row in left stream can find correct version.

Best,
Leonard