I have an issue with tumbling windows running in parallel. I run a Job on a set of CSV files. When the parallelism is set to 1. I get the proper results. While it runs in parallel. I get no output. Is it due to the fact the parallel streams take the MAX(watermark) from all the parallel sources. And only one of the streams advances the watermark ? It seems wrong that the result is not deterministic and depends on the parallel level.
What am I doing wrong ? |
Hi, Can you share a few more details about the data source? Are you continuously ingesting files from a folder? You are correct, that the parallelism should not affect the results, but there are a few things that can affect that: 1) non-determnistic keys 2) out-of-order data with inappropriate watermarks Note that watermark configuration for file ingests can be difficult and that you need to ensure that files are read in the "right" order. AFAIK, Flink's continuous file source uses the modification timestamp of files to determine the read order. Best, Fabian Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai <[hidden email]>:
|
The data source is generated by an application that monitors some sort of sessions. With the EVENT_TIME column being the session end time . It is possible that the files will have out of order data , because of the async nature of the application writing files.
While the EVENT_TIME is monotonically increasing in general . some lateness is possible. However , I used
allowlateness on my stream and still got the inconsistencies Although the real life use case is generically reading files form a folder. The testing env has an already set of files in advanced - these should be read and produce the result. You mentioned the “right” order of the files. Is it sorted by update time ? when running in parallel, is it possible that 2 files will be read in parallel. And in case that the latter
one is smaller. The latest timestamp will be handled first ? BTW I tried to use a ContinuousEventTimeTrigger to make sure the window is calculated ? and got the processing to trigger
multiple times so I’m not sure exactly how this type of trigger works.. Thanks From: Fabian Hueske <[hidden email]>
Hi, Can you share a few more details about the data source? Are you continuously ingesting files from a folder? You are correct, that the parallelism should not affect the results, but there are a few things that can affect that: 1) non-determnistic keys 2) out-of-order data with inappropriate watermarks Note that watermark configuration for file ingests can be difficult and that you need to ensure that files are read in the "right" order. AFAIK, Flink's continuous file source uses the modification timestamp of files to determine the read order. Best, Fabian Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai <[hidden email]>:
|
Hi, The paths of the files to read are distributed across all reader / source tasks and each task reads the files in order of their modification timestamp. The watermark generator is not aware of any files and just looks at the stream of records produced by the source tasks. You need to chose the WM generator strategy such that you minimize the number of late records. I'd recommend to first investigate how many late records you are dealing with. You can use a custom ProcessFunction and compare the timestamp of each record with the current watermark. AllowedLateness is also not a magical cure. It will just emit updates downstream, i.e., you need to remove the results that were updated by a more complete result. Best, Fabian Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <[hidden email]>:
|
You said “
You can use a custom ProcessFunction and compare the timestamp of each record with the current watermark.”. Does the window process function has all the events – even the ones that are dropped due to lateness?
Also, when looking on Flink’s monitoring page - for the watermarks I see different vales even after all my files were processed. Which is something I would not expect thanks From: Fabian Hueske <[hidden email]>
Hi, The paths of the files to read are distributed across all reader / source tasks and each task reads the files in order of their modification timestamp. The watermark generator is not aware of any files and just looks at the stream of records produced by the source tasks.
You need to chose the WM generator strategy such that you minimize the number of late records. I'd recommend to first investigate how many late records you are dealing with. You can use a custom ProcessFunction and compare the timestamp of each record with the current watermark. AllowedLateness is also not a magical cure. It will just emit updates downstream, i.e., you need to remove the results that were updated by a more complete result. Best, Fabian Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <[hidden email]>:
|
I would use a regular ProcessFunction, not a WindowProcessFunction. The final WM depends on how the records were partitioned at the watermark assigner (and the assigner itself). AFAIK, the distribution of files to source reader tasks is not deterministic. Hence, the final WM changes from run to run. Fabian Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai <[hidden email]>:
|
Im not sure what you mean by use process function and not window process function , as the window operator takes in a windowprocess function.. From: Fabian Hueske <[hidden email]>
I would use a regular ProcessFunction, not a WindowProcessFunction. The final WM depends on how the records were partitioned at the watermark assigner (and the assigner itself). AFAIK, the distribution of files to source reader tasks is not deterministic. Hence, the final WM changes from run to run. Fabian Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai <[hidden email]>:
|
I meant to not use Flink's built-in windows at all but implement your logic in a KeyedProcessFunction. So basically: myDataStream.keyBy(...).process(new MyKeyedProcessFunction) instead of: myDataStream.keyBy(...).window(...).process(new MyWindowProcessFunction) Am Mo., 2. Sept. 2019 um 15:29 Uhr schrieb Hanan Yehudai <[hidden email]>:
|
Free forum by Nabble | Edit this page |