Hey, I have noticed a weird behavior with a job that I am currently working on. I have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea is that first I do SQL Join of A & B based on some field, then I create append stream from Joined A&B, let's call it E. Then I need to assign timestamps to E since it is a result of joining and Flink can't figure out the timestamps. Next, I union E & C, to create some F stream. Then finally I connect E & C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try to, it works fine if I enforce the parallelism of E to be 1 by invoking setParallelism. But if parallelism is higher than 1, for the same data - the watermark is not progressing correctly. I can see that CoProcessFunction methods are invoked and that data is produced, but the Watermark is never progressing for this function. What I can see is that watermark is always equal to (0 - allowedOutOfOrderness). I can see that timestamps are correctly extracted and when I add debug prints I can actually see that Watermarks are generated for all streams, but for some reason, if the parallelism is > 1 they will never progress up to connect function. Is there anything that needs to be done after SQL joins that I don't know of ?? Best Regards, Dom. |
Hi Dominik, I had the same once with a custom processfunction. My processfunction buffered the data for a while and then output it again. As the proces function can do anything with the data (transforming, buffering, aggregating...), I think it's just not safe for flink to reason about the watermark of the output. I solved all my issues by calling `assignTimestampsAndWatermarks` directly post to the (co-)process function. Best regards Theo Von: "Dominik Wosiński" <[hidden email]> An: "user" <[hidden email]> Gesendet: Montag, 16. März 2020 16:55:18 Betreff: Issues with Watermark generation after join Hey, I have noticed a weird behavior with a job that I am currently working on. I have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea is that first I do SQL Join of A & B based on some field, then I create append stream from Joined A&B, let's call it E. Then I need to assign timestamps to E since it is a result of joining and Flink can't figure out the timestamps. Next, I union E & C, to create some F stream. Then finally I connect E & C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try to, it works fine if I enforce the parallelism of E to be 1 by invoking setParallelism. But if parallelism is higher than 1, for the same data - the watermark is not progressing correctly. I can see that CoProcessFunction methods are invoked and that data is produced, but the Watermark is never progressing for this function. What I can see is that watermark is always equal to (0 - allowedOutOfOrderness). I can see that timestamps are correctly extracted and when I add debug prints I can actually see that Watermarks are generated for all streams, but for some reason, if the parallelism is > 1 they will never progress up to connect function. Is there anything that needs to be done after SQL joins that I don't know of ?? Best Regards, Dom. |
Actually, I just put this process function there for debugging purposes. My main goal is to join the E & C using the Temporal Table function, but I have observed exactly the same behavior i.e. when the parallelism was > 1 there was no output and when I was setting it to 1 then the output was generated. So, I have switched to process function to see whether the watermarks are reaching this stage. Best Regards, Dom. pon., 16 mar 2020 o 19:46 Theo Diefenthal <[hidden email]> napisał(a):
|
Hi, could you share the SQL you written for your original purpose, not the one you attached ProcessFunction for debugging? Best, Kurt On Tue, Mar 17, 2020 at 3:08 AM Dominik Wosiński <[hidden email]> wrote:
|
And the previous SQL query to join A&B is something like : Also, if I replace the SQL to Join A&B with BroadcastProcessFunction this works like a charm, everything is calculated correctly. Even if I don't change the parallelism. I have noticed one more weird behavior, after the temporal table Join I have a windowing function to process the data. Now I have two options, in TTF I can select the rowtime with type Timestamp and assign it to field in output class, this automatically passes the Timestamp over so I don't need to assign it again. But I could also select just a Long field that is not marked as rowtime (even if they actually have the same value but this field was not marked with .rowtime on declaration) and then I will need to assign the timestamps and watermarks again, since Flink doesn't now what is the timestamp. Now, the former solution works like a charm, but for the latter one there is actually no output visible from the windowing function. My expectation is that both solutions should work exactly the same and pass the timestamps in the same manner, but apparently they are don't. Best Regards, Dom. |
In reply to this post by Kurt Young
I have created a simple minimal reproducible example that shows what I am talking about: It contains a test that shows that even if the output is in order which is enforced by multiple sleeps, then for parallelism > 1 there is no output and for parallelism == 1, the output is produced normally. Best Regards, Dom. |
Hi Dominik,
the big conceptual difference between DataStream and Table API is that record timestamps are part of the schema in Table API whereas they are attached internally to each record in DataStream API. When you call `y.rowtime` during a stream to table conversion, the runtime will extract the internal timestamp and will copy it into the field `y`. Even if the timestamp is not internally anymore, Flink makes sure that the watermarking (which still happens internally) remains valid. However, this means that timestamps and watermarks must already be correct when entering the Table API. If they were not correct before, they will also not trigger time-based operations correctly. If there is no output for a parallelism > 1, usually this means that one source parition has not emitted a watermark to have progress globally for the job: watermark of operator = min(previous operator partition 1, previous operator partition 2, ...) I hope this helps. Regards, Timo On 19.03.20 16:38, Dominik Wosiński wrote: > I have created a simple minimal reproducible example that shows what I > am talking about: > https://github.com/DomWos/FlinkTTF/tree/sql-ttf > > It contains a test that shows that even if the output is in order which > is enforced by multiple sleeps, then for parallelism > 1 there is no > output and for parallelism == 1, the output is produced normally. > > Best Regards, > Dom. |
Hey Timo, Thanks a lot for this answer! I was mostly using the DataStream API, so that's good to know the difference. I have followup questions then, I will be glad for clarification: 1) So, for the SQL Join operator, is the partition the parallel instance of operator or is it the table partitioning as defined by partitionBy ?? 2) Assuming that this is instance of parallel operator, does this mean that we need output from ALL operators so that the watermark progresses and the output is generated? Best Regards, Dom. wt., 24 mar 2020 o 10:01 Timo Walther <[hidden email]> napisał(a): Hi Dominik, |
Hi,
1) yes with "partition" I meant "parallel instance". If the watermarking is correct in the DataStream API. The Table API and SQL will take care that it remains correct. E.g. you can only perform a TUMBLE window if the timestamp column has not lost its time attribute property. A regular JOIN (not time-versioned) does not work with watermarks, thus, the result will not have time attributes anymore. A subsequent TUMBLE window usage will fail with an exception. 2) You don't need output. Most operators deal with watermarking logic. But for sources, you need output from all sources in order to have progress in event-time. Regards, Timo On 24.03.20 12:21, Dominik Wosiński wrote: > Hey Timo, > Thanks a lot for this answer! I was mostly using the DataStream API, so > that's good to know the difference. > I have followup questions then, I will be glad for clarification: > > 1) So, for the SQL Join operator, is the /partition /the parallel > instance of operator or is it the table partitioning as defined by > /partitionBy ??/ > 2) Assuming that this is instance of parallel operator, does this mean > that we need output from ALL operators so that the watermark progresses > and the output is generated? > > Best Regards, > Dom. > > wt., 24 mar 2020 o 10:01 Timo Walther <[hidden email] > <mailto:[hidden email]>> napisał(a): > > Hi Dominik, > > the big conceptual difference between DataStream and Table API is that > record timestamps are part of the schema in Table API whereas they are > attached internally to each record in DataStream API. When you call > `y.rowtime` during a stream to table conversion, the runtime will > extract the internal timestamp and will copy it into the field `y`. > > Even if the timestamp is not internally anymore, Flink makes sure that > the watermarking (which still happens internally) remains valid. > However, this means that timestamps and watermarks must already be > correct when entering the Table API. If they were not correct before, > they will also not trigger time-based operations correctly. > > If there is no output for a parallelism > 1, usually this means that > one > source parition has not emitted a watermark to have progress globally > for the job: > > watermark of operator = min(previous operator partition 1, previous > operator partition 2, ...) > > I hope this helps. > > Regards, > Timo > > > On 19.03.20 16:38, Dominik Wosiński wrote: > > I have created a simple minimal reproducible example that shows > what I > > am talking about: > > https://github.com/DomWos/FlinkTTF/tree/sql-ttf > > > > It contains a test that shows that even if the output is in order > which > > is enforced by multiple sleeps, then for parallelism > 1 there is no > > output and for parallelism == 1, the output is produced normally. > > > > Best Regards, > > Dom. > |
Or better: "But for sources, you need to emit a watermark from all
sources in order to have progress in event-time." On 24.03.20 13:09, Timo Walther wrote: > Hi, > > 1) yes with "partition" I meant "parallel instance". > > If the watermarking is correct in the DataStream API. The Table API and > SQL will take care that it remains correct. E.g. you can only perform a > TUMBLE window if the timestamp column has not lost its time attribute > property. A regular JOIN (not time-versioned) does not work with > watermarks, thus, the result will not have time attributes anymore. A > subsequent TUMBLE window usage will fail with an exception. > > 2) You don't need output. Most operators deal with watermarking logic. > But for sources, you need output from all sources in order to have > progress in event-time. > > Regards, > Timo > > > On 24.03.20 12:21, Dominik Wosiński wrote: >> Hey Timo, >> Thanks a lot for this answer! I was mostly using the DataStream API, >> so that's good to know the difference. >> I have followup questions then, I will be glad for clarification: >> >> 1) So, for the SQL Join operator, is the /partition /the parallel >> instance of operator or is it the table partitioning as defined by >> /partitionBy ??/ >> 2) Assuming that this is instance of parallel operator, does this mean >> that we need output from ALL operators so that the watermark >> progresses and the output is generated? >> >> Best Regards, >> Dom. >> >> wt., 24 mar 2020 o 10:01 Timo Walther <[hidden email] >> <mailto:[hidden email]>> napisał(a): >> >> Hi Dominik, >> >> the big conceptual difference between DataStream and Table API is >> that >> record timestamps are part of the schema in Table API whereas they >> are >> attached internally to each record in DataStream API. When you call >> `y.rowtime` during a stream to table conversion, the runtime will >> extract the internal timestamp and will copy it into the field `y`. >> >> Even if the timestamp is not internally anymore, Flink makes sure >> that >> the watermarking (which still happens internally) remains valid. >> However, this means that timestamps and watermarks must already be >> correct when entering the Table API. If they were not correct before, >> they will also not trigger time-based operations correctly. >> >> If there is no output for a parallelism > 1, usually this means that >> one >> source parition has not emitted a watermark to have progress globally >> for the job: >> >> watermark of operator = min(previous operator partition 1, previous >> operator partition 2, ...) >> >> I hope this helps. >> >> Regards, >> Timo >> >> >> On 19.03.20 16:38, Dominik Wosiński wrote: >> > I have created a simple minimal reproducible example that shows >> what I >> > am talking about: >> > https://github.com/DomWos/FlinkTTF/tree/sql-ttf >> > >> > It contains a test that shows that even if the output is in order >> which >> > is enforced by multiple sleeps, then for parallelism > 1 there >> is no >> > output and for parallelism == 1, the output is produced normally. >> > >> > Best Regards, >> > Dom. >> |
Free forum by Nabble | Edit this page |