Hi All,
I am using categoryID as a keyby attribute for creating keyed stream from a product event stream. Keyed stream then creates time windows for each category. However, when the window time expires, i want to write the output data of all the products in all all categories in a single atomic operation collectively. Is there a way to call a single sink function for all the windows with same start and end time. Or is there a way in flink to know that all windows with same end time have finished processing their sink function? Currently, each window calls sink function individually. cheers, |
Hi,
one possible approach could be that you have a process function before the sink. Process function is aware of watermarks, so it can collect and buffer window results until it sees a watermark. This is the signal that all results for windows with an end time smaller than the watermark are complete. They can then be aggregated and send to the sink. Best, Stefan > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <[hidden email]>: > > Hi All, > > I am using categoryID as a keyby attribute for creating keyed stream from a product event stream. Keyed stream then creates time windows for each category. However, when the window time expires, i want to write the output data of all the products in all all categories in a single atomic operation collectively. Is there a way to call a single sink function for all the windows with same start and end time. Or is there a way in flink to know that all windows with same end time have finished processing their sink function? > > Currently, each window calls sink function individually. > > cheers, > |
Thanks Stefan, But how the Process function will have these watermarks? I have sliding windows like below
final DataStream<WindowStats> eventStream = inputStream .keyBy(TENANT, CATEGORY) .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5))) .fold(new WindowStats(), new ProductAggregationMapper(), new ProductAggregationWindowFunction()); Window results are coming every 5 minutes after first window output. How the process function would know that all the windows for a Tenant have finished for a giving start and end time. Thanks for help. Cheers, On 22 June 2017 at 14:37, Stefan Richter <[hidden email]> wrote: Hi, |
The process function has the signature
void processElement(I value, Context ctx, Collector<O> out) throws Exception where the context is providing access to the current watermark and you can also register timer callbacks, when that trigger when a certain watermark is reached. You can simply monitor the watermark through the context for each incoming window result. Start time is not important, because you know that you have collected the results for all windows with a smaller end time than the watermark that you currently see in the context, because this is Flink’s notion of completeness. This means you can prepare those windows and aggregate results and send them downstream to the sink.
|
Hi Stefan, How process function would know that the last window result has arrived? Because slidingwindows slide every 5 minutes which means that window of new time-range or new watermark will arrive after 5 minutes. Thanks On 22 June 2017 at 15:10, Stefan Richter <[hidden email]> wrote:
|
Hi Ahmad, Flink's watermark mechanism guarantees that when you receive a watermark for time t all records with a timestamp smaller than t have been received as well.The function should buffer all records it receives between watermarks as state and once it receives a watermark (triggering of a registered event-time timer) it should write the buffered records out. Btw. this only works for event time windows but not for processing time. 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <[hidden email]>:
|
Thanks for the answers. My scenario is: | Window A | | Window B | | Window C | If no events are received for Window C, then how process function would know that both window 'A' and window 'B' have finished and need to aggregated their result before sink is called? Thanks On 22 June 2017 at 16:27, Fabian Hueske <[hidden email]> wrote:
|
Let's say window A and window B end at 12:00:00 and window C at 13:00:00. When the ProcessFunction receives a watermark at 12:00:01, it knows that Window A and B have been finished. 2017-06-22 17:44 GMT+02:00 Ahmad Hassan <[hidden email]>:
|
Hi Fabian, How the process function will be called at 12:00:01 as there are no windows output or events after 12:00:00. Thanks
|
You have to register an event-time timer in the `processElement()` method. You'll get a callback to `onTimer()` when the function receives a watermark that is greater than the registered timer. So you can always register a timer for the end time of the next window to get a call back to `onTimer()` when all results for a window have been received. The documentation of the ProcessFunction explains details [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html 2017-06-22 18:49 GMT+02:00 Ahmad Hassan <[hidden email]>:
|
Thanks Fabian and Stefan for all the help.
Best Regards, > On 22 Jun 2017, at 18:06, Fabian Hueske <[hidden email]> wrote: > > 1]. |
Free forum by Nabble | Edit this page |