Hi,
I'm curious why Event Time Temporal Join needs watermarks from both sides to perform join. Shouldn't watermark on versioned table side be enough to perform join ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, maverick. The watermark is used to determine the message is late or early. If we only use the watermark on versioned table side, we have no means to determine whether the event in the main stream is ready to emit. Best, Shengkai maverick <[hidden email]> 于2021年4月26日周一 上午2:31写道: Hi, |
Hi Shengkai,
Thanks for the answer. The question is do we need to determine if an event in the main stream is late. Let's look at interval join - event is emitted as soon as there is a match between left and right stream. I agree the watermark should pass on versioned table side, because this is the only way to know which version of record should be used. But if we mimics behaviour of interval join then main stream watermark could be skipped. Regards, Maciek pon., 26 kwi 2021 o 06:14 Shengkai Fang <[hidden email]> napisał(a): > > Hi, maverick. > > The watermark is used to determine the message is late or early. If we only use the watermark on versioned table side, we have no means to determine whether the event in the main stream is ready to emit. > > Best, > Shengkai > > maverick <[hidden email]> 于2021年4月26日周一 上午2:31写道: >> >> Hi, >> I'm curious why Event Time Temporal Join needs watermarks from both sides to >> perform join. >> >> Shouldn't watermark on versioned table side be enough to perform join ? >> >> >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Maciek Bryński |
Hello, Maciej
IIRC, rowtime interval join requires the watermark on both sides, and the watermark will be used to clean up the outdated data and advance the data progress both in rowtime interval join and rowtime temporal join. Best, Leonard |
Hi Leonard,
Let's assume we have two streams. S1 - id, value1, ts1 with watermark = ts1 - 1 S2 - id, value2, ts2 with watermark = ts2 - 1 Then we have following interval join SELECT id, value1, value2, ts1 FROM S1 JOIN S2 ON S1.id = S2.id and ts1 between ts2 - 1 and ts2 Let's have events. stream, id, value, ts S1, id1, v1, 1 S2, id1, v2, 1 For this events and internal join Flink will emit an event in the output stream: id1, v1, v2, 1 Despite the fact the watermark for both streams is not reached. Now similar situation for Event Time Temporal Join SELECT id, value1, value2, ts1 FROM S1 JOIN S2 FOR SYSTEM_TIME AS OF S1.ts1 ON S1.id = S2.id Let's have events. S1, id1, v1, 1 S2, id1, v2, 1 Nothing is happening as none of the streams have reached the watermark. Now let's add S2, id2, v2, 101 This should trigger join for id1 because we have all the knowledge to perform this join (we know that the watermark for id1 record was reached). Unfortunately to trigger join on id1 we also need a watermark on S1 side and I think this behaviour is wrong. I hope I explained everything correctly. Regards, Maciek wt., 27 kwi 2021 o 08:58 Leonard Xu <[hidden email]> napisał(a): > > Hello, Maciej > > I agree the watermark should pass on versioned table side, because > this is the only way to know which version of record should be used. > But if we mimics behaviour of interval join then main stream watermark > could be skipped. > > > IIRC, rowtime interval join requires the watermark on both sides, and the watermark > will be used to clean up the outdated data and advance the data progress both in rowtime interval join and rowtime temporal join. > > Best, > Leonard > -- Maciek Bryński |
Thanks for your example, Maciej
I can explain more about the design.
Base on this example, if the left stream events are S1, id1, v1, 1 S1, id1, v2, 101 S1, id1, v3, 102 S1, id1, v4, 99 // assume the we need watermark as` ts - 3` For out-of-order data like late event v4(99) should we ginored or not because the versioned table’s watermark is only base on versioned table’s data instead of both sides, and it cannot represents the of left stream, now we use left stream’s watermark for out-of-order data. And continually base on your example, assume the right stream events are S2, id2, v1, 101 S2, id3, v1, 101 S2, id3, v2, 102 S2, id3, v3, 103 S2, id3, v4, 104 How we clean the old version data e.g. id3(v1~v3)? If you clean them only base on versioned table watermark(e.g. versioned table watermark is 105), the data (id3, 101), (id3, 102) from left stream data cannot find correct version, right? Now the left stream watermark is used to clean up the outdated data and ensure the every row in left stream can find correct version. Best, Leonard |
Free forum by Nabble | Edit this page |