Issues with Watermark generation after join

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Issues with Watermark generation after join

Dominik Wosiński
Hey,
I have noticed a weird behavior with a job that I am currently working on. I have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea is that first I do SQL Join of A & B based on some field, then I create append stream from Joined A&B, let's call it E. Then I need to assign timestamps to E since it is a result of joining and Flink can't figure out the timestamps. 

Next, I union E & C, to create some F stream. Then finally I connect E & C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try to, it works fine if I enforce the parallelism of E to be 1 by invoking setParallelism. But if parallelism is higher than 1, for the same data - the watermark is not progressing correctly. I can see that CoProcessFunction methods are invoked and that data is produced, but the Watermark is never progressing for this function. What I can see is that watermark is always equal to (0 - allowedOutOfOrderness). I can see that timestamps are correctly extracted and when I add debug prints I can actually see that Watermarks are generated for all streams, but for some reason, if the parallelism is > 1 they will never progress up to connect function. Is there anything that needs to be done after SQL joins that I don't know of ?? 

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Theo
Hi Dominik,

I had the same once with a custom processfunction. My processfunction buffered the data for a while and then output it again. As the proces function can do anything with the data (transforming, buffering, aggregating...), I think it's just not safe for flink to reason about the watermark of the output.

I solved all my issues by calling `assignTimestampsAndWatermarks` directly post to the (co-)process function.

Best regards
Theo


Von: "Dominik Wosiński" <[hidden email]>
An: "user" <[hidden email]>
Gesendet: Montag, 16. März 2020 16:55:18
Betreff: Issues with Watermark generation after join

Hey,
I have noticed a weird behavior with a job that I am currently working on. I have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea is that first I do SQL Join of A & B based on some field, then I create append stream from Joined A&B, let's call it E. Then I need to assign timestamps to E since it is a result of joining and Flink can't figure out the timestamps. 

Next, I union E & C, to create some F stream. Then finally I connect E & C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try to, it works fine if I enforce the parallelism of E to be 1 by invoking setParallelism. But if parallelism is higher than 1, for the same data - the watermark is not progressing correctly. I can see that CoProcessFunction methods are invoked and that data is produced, but the Watermark is never progressing for this function. What I can see is that watermark is always equal to (0 - allowedOutOfOrderness). I can see that timestamps are correctly extracted and when I add debug prints I can actually see that Watermarks are generated for all streams, but for some reason, if the parallelism is > 1 they will never progress up to connect function. Is there anything that needs to be done after SQL joins that I don't know of ?? 

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Dominik Wosiński
Actually, I just put this process function there for debugging purposes. My main goal is to join the E & C using the Temporal Table function, but I have observed exactly the same behavior i.e. when the parallelism was > 1 there was no output and when I was setting it to 1 then the output was generated. So, I have switched to process function to see whether the watermarks are reaching this stage.

Best Regards,
Dom.

pon., 16 mar 2020 o 19:46 Theo Diefenthal <[hidden email]> napisał(a):
Hi Dominik,

I had the same once with a custom processfunction. My processfunction buffered the data for a while and then output it again. As the proces function can do anything with the data (transforming, buffering, aggregating...), I think it's just not safe for flink to reason about the watermark of the output.

I solved all my issues by calling `assignTimestampsAndWatermarks` directly post to the (co-)process function.

Best regards
Theo


Von: "Dominik Wosiński" <[hidden email]>
An: "user" <[hidden email]>
Gesendet: Montag, 16. März 2020 16:55:18
Betreff: Issues with Watermark generation after join

Hey,
I have noticed a weird behavior with a job that I am currently working on. I have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea is that first I do SQL Join of A & B based on some field, then I create append stream from Joined A&B, let's call it E. Then I need to assign timestamps to E since it is a result of joining and Flink can't figure out the timestamps. 

Next, I union E & C, to create some F stream. Then finally I connect E & C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try to, it works fine if I enforce the parallelism of E to be 1 by invoking setParallelism. But if parallelism is higher than 1, for the same data - the watermark is not progressing correctly. I can see that CoProcessFunction methods are invoked and that data is produced, but the Watermark is never progressing for this function. What I can see is that watermark is always equal to (0 - allowedOutOfOrderness). I can see that timestamps are correctly extracted and when I add debug prints I can actually see that Watermarks are generated for all streams, but for some reason, if the parallelism is > 1 they will never progress up to connect function. Is there anything that needs to be done after SQL joins that I don't know of ?? 

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Kurt Young
Hi, could you share the SQL you written for your original purpose, not the one you attached ProcessFunction for debugging?

Best,
Kurt


On Tue, Mar 17, 2020 at 3:08 AM Dominik Wosiński <[hidden email]> wrote:
Actually, I just put this process function there for debugging purposes. My main goal is to join the E & C using the Temporal Table function, but I have observed exactly the same behavior i.e. when the parallelism was > 1 there was no output and when I was setting it to 1 then the output was generated. So, I have switched to process function to see whether the watermarks are reaching this stage.

Best Regards,
Dom.

pon., 16 mar 2020 o 19:46 Theo Diefenthal <[hidden email]> napisał(a):
Hi Dominik,

I had the same once with a custom processfunction. My processfunction buffered the data for a while and then output it again. As the proces function can do anything with the data (transforming, buffering, aggregating...), I think it's just not safe for flink to reason about the watermark of the output.

I solved all my issues by calling `assignTimestampsAndWatermarks` directly post to the (co-)process function.

Best regards
Theo


Von: "Dominik Wosiński" <[hidden email]>
An: "user" <[hidden email]>
Gesendet: Montag, 16. März 2020 16:55:18
Betreff: Issues with Watermark generation after join

Hey,
I have noticed a weird behavior with a job that I am currently working on. I have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea is that first I do SQL Join of A & B based on some field, then I create append stream from Joined A&B, let's call it E. Then I need to assign timestamps to E since it is a result of joining and Flink can't figure out the timestamps. 

Next, I union E & C, to create some F stream. Then finally I connect E & C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try to, it works fine if I enforce the parallelism of E to be 1 by invoking setParallelism. But if parallelism is higher than 1, for the same data - the watermark is not progressing correctly. I can see that CoProcessFunction methods are invoked and that data is produced, but the Watermark is never progressing for this function. What I can see is that watermark is always equal to (0 - allowedOutOfOrderness). I can see that timestamps are correctly extracted and when I add debug prints I can actually see that Watermarks are generated for all streams, but for some reason, if the parallelism is > 1 they will never progress up to connect function. Is there anything that needs to be done after SQL joins that I don't know of ?? 

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Dominik Wosiński
Hey sure,
the original Temporal Table SQL is:
|SELECT e.*, f.level as level FROM
| enablers AS e,
| LATERAL TABLE (Detectors(e.timestamp)) AS f
| WHERE e.id= f.id
|""
And the previous SQL query to join A&B is something like :

SELECT *
| FROM A te,
| B s
| WHERE s.id = te.id AND s.level = te.level AND s.timestamp = te.timestamp

Also, if I replace the SQL to Join A&B with BroadcastProcessFunction this works like a charm, everything is calculated correctly. Even if I don't change the parallelism.

I have noticed one more weird behavior, after the temporal table Join I have a windowing function to process the data. Now I have two options, in TTF I can select the rowtime with type Timestamp and assign it to field in output class, this automatically passes the Timestamp over so I don't need to assign it again. But I could also select just a Long field that is not marked as rowtime (even if they actually have the same value but this field was not marked with .rowtime on declaration) and then I will need to assign the timestamps and watermarks again, since Flink doesn't now what is the timestamp. Now, the former solution works like a charm, but for the latter one there is actually no output visible from the windowing function. My expectation is that both solutions should work exactly the same and pass the timestamps in the same manner, but apparently they are don't.

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Dominik Wosiński
In reply to this post by Kurt Young
I have created a simple minimal reproducible example that shows what I am talking about:

It contains a test that shows that even if the output is in order which is enforced by multiple sleeps, then for parallelism > 1 there is no output and for parallelism == 1, the output is produced normally.

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Timo Walther
Hi Dominik,

the big conceptual difference between DataStream and Table API is that
record timestamps are part of the schema in Table API whereas they are
attached internally to each record in DataStream API. When you call
`y.rowtime` during a stream to table conversion, the runtime will
extract the internal timestamp and will copy it into the field `y`.

Even if the timestamp is not internally anymore, Flink makes sure that
the watermarking (which still happens internally) remains valid.
However, this means that timestamps and watermarks must already be
correct when entering the Table API. If they were not correct before,
they will also not trigger time-based operations correctly.

If there is no output for a parallelism > 1, usually this means that one
source parition has not emitted a watermark to have progress globally
for the job:

watermark of operator = min(previous operator partition 1, previous
operator partition 2, ...)

I hope this helps.

Regards,
Timo


On 19.03.20 16:38, Dominik Wosiński wrote:

> I have created a simple minimal reproducible example that shows what I
> am talking about:
> https://github.com/DomWos/FlinkTTF/tree/sql-ttf
>
> It contains a test that shows that even if the output is in order which
> is enforced by multiple sleeps, then for parallelism > 1 there is no
> output and for parallelism == 1, the output is produced normally.
>
> Best Regards,
> Dom.

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Dominik Wosiński
Hey Timo, 
Thanks a lot for this answer! I was mostly using the DataStream API, so that's good to know the difference. 
I have followup questions then, I will be glad for clarification:

1) So, for the SQL Join operator, is the partition the parallel instance of operator or is it the table partitioning as defined by partitionBy ??
2) Assuming that this is instance of parallel operator, does this mean that we need output from ALL operators so that the watermark progresses and the output is generated?

Best Regards,
Dom.

wt., 24 mar 2020 o 10:01 Timo Walther <[hidden email]> napisał(a):
Hi Dominik,

the big conceptual difference between DataStream and Table API is that
record timestamps are part of the schema in Table API whereas they are
attached internally to each record in DataStream API. When you call
`y.rowtime` during a stream to table conversion, the runtime will
extract the internal timestamp and will copy it into the field `y`.

Even if the timestamp is not internally anymore, Flink makes sure that
the watermarking (which still happens internally) remains valid.
However, this means that timestamps and watermarks must already be
correct when entering the Table API. If they were not correct before,
they will also not trigger time-based operations correctly.

If there is no output for a parallelism > 1, usually this means that one
source parition has not emitted a watermark to have progress globally
for the job:

watermark of operator = min(previous operator partition 1, previous
operator partition 2, ...)

I hope this helps.

Regards,
Timo


On 19.03.20 16:38, Dominik Wosiński wrote:
> I have created a simple minimal reproducible example that shows what I
> am talking about:
> https://github.com/DomWos/FlinkTTF/tree/sql-ttf
>
> It contains a test that shows that even if the output is in order which
> is enforced by multiple sleeps, then for parallelism > 1 there is no
> output and for parallelism == 1, the output is produced normally.
>
> Best Regards,
> Dom.

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Timo Walther
Hi,

1) yes with "partition" I meant "parallel instance".

If the watermarking is correct in the DataStream API. The Table API and
SQL will take care that it remains correct. E.g. you can only perform a
TUMBLE window if the timestamp column has not lost its time attribute
property. A regular JOIN (not time-versioned) does not work with
watermarks, thus, the result will not have time attributes anymore. A
subsequent TUMBLE window usage will fail with an exception.

2) You don't need output. Most operators deal with watermarking logic.
But for sources, you need output from all sources in order to have
progress in event-time.

Regards,
Timo


On 24.03.20 12:21, Dominik Wosiński wrote:

> Hey Timo,
> Thanks a lot for this answer! I was mostly using the DataStream API, so
> that's good to know the difference.
> I have followup questions then, I will be glad for clarification:
>
> 1) So, for the SQL Join operator, is the /partition /the parallel
> instance of operator or is it the table partitioning as defined by
> /partitionBy ??/
> 2) Assuming that this is instance of parallel operator, does this mean
> that we need output from ALL operators so that the watermark progresses
> and the output is generated?
>
> Best Regards,
> Dom.
>
> wt., 24 mar 2020 o 10:01 Timo Walther <[hidden email]
> <mailto:[hidden email]>> napisał(a):
>
>     Hi Dominik,
>
>     the big conceptual difference between DataStream and Table API is that
>     record timestamps are part of the schema in Table API whereas they are
>     attached internally to each record in DataStream API. When you call
>     `y.rowtime` during a stream to table conversion, the runtime will
>     extract the internal timestamp and will copy it into the field `y`.
>
>     Even if the timestamp is not internally anymore, Flink makes sure that
>     the watermarking (which still happens internally) remains valid.
>     However, this means that timestamps and watermarks must already be
>     correct when entering the Table API. If they were not correct before,
>     they will also not trigger time-based operations correctly.
>
>     If there is no output for a parallelism > 1, usually this means that
>     one
>     source parition has not emitted a watermark to have progress globally
>     for the job:
>
>     watermark of operator = min(previous operator partition 1, previous
>     operator partition 2, ...)
>
>     I hope this helps.
>
>     Regards,
>     Timo
>
>
>     On 19.03.20 16:38, Dominik Wosiński wrote:
>      > I have created a simple minimal reproducible example that shows
>     what I
>      > am talking about:
>      > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
>      >
>      > It contains a test that shows that even if the output is in order
>     which
>      > is enforced by multiple sleeps, then for parallelism > 1 there is no
>      > output and for parallelism == 1, the output is produced normally.
>      >
>      > Best Regards,
>      > Dom.
>

Reply | Threaded
Open this post in threaded view
|

Re: Issues with Watermark generation after join

Timo Walther
Or better: "But for sources, you need to emit a watermark from all
sources in order to have progress in event-time."


On 24.03.20 13:09, Timo Walther wrote:

> Hi,
>
> 1) yes with "partition" I meant "parallel instance".
>
> If the watermarking is correct in the DataStream API. The Table API and
> SQL will take care that it remains correct. E.g. you can only perform a
> TUMBLE window if the timestamp column has not lost its time attribute
> property. A regular JOIN (not time-versioned) does not work with
> watermarks, thus, the result will not have time attributes anymore. A
> subsequent TUMBLE window usage will fail with an exception.
>
> 2) You don't need output. Most operators deal with watermarking logic.
> But for sources, you need output from all sources in order to have
> progress in event-time.
>
> Regards,
> Timo
>
>
> On 24.03.20 12:21, Dominik Wosiński wrote:
>> Hey Timo,
>> Thanks a lot for this answer! I was mostly using the DataStream API,
>> so that's good to know the difference.
>> I have followup questions then, I will be glad for clarification:
>>
>> 1) So, for the SQL Join operator, is the /partition /the parallel
>> instance of operator or is it the table partitioning as defined by
>> /partitionBy ??/
>> 2) Assuming that this is instance of parallel operator, does this mean
>> that we need output from ALL operators so that the watermark
>> progresses and the output is generated?
>>
>> Best Regards,
>> Dom.
>>
>> wt., 24 mar 2020 o 10:01 Timo Walther <[hidden email]
>> <mailto:[hidden email]>> napisał(a):
>>
>>     Hi Dominik,
>>
>>     the big conceptual difference between DataStream and Table API is
>> that
>>     record timestamps are part of the schema in Table API whereas they
>> are
>>     attached internally to each record in DataStream API. When you call
>>     `y.rowtime` during a stream to table conversion, the runtime will
>>     extract the internal timestamp and will copy it into the field `y`.
>>
>>     Even if the timestamp is not internally anymore, Flink makes sure
>> that
>>     the watermarking (which still happens internally) remains valid.
>>     However, this means that timestamps and watermarks must already be
>>     correct when entering the Table API. If they were not correct before,
>>     they will also not trigger time-based operations correctly.
>>
>>     If there is no output for a parallelism > 1, usually this means that
>>     one
>>     source parition has not emitted a watermark to have progress globally
>>     for the job:
>>
>>     watermark of operator = min(previous operator partition 1, previous
>>     operator partition 2, ...)
>>
>>     I hope this helps.
>>
>>     Regards,
>>     Timo
>>
>>
>>     On 19.03.20 16:38, Dominik Wosiński wrote:
>>      > I have created a simple minimal reproducible example that shows
>>     what I
>>      > am talking about:
>>      > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
>>      >
>>      > It contains a test that shows that even if the output is in order
>>     which
>>      > is enforced by multiple sleeps, then for parallelism > 1 there
>> is no
>>      > output and for parallelism == 1, the output is produced normally.
>>      >
>>      > Best Regards,
>>      > Dom.
>>