Hi all,
In Flink, after setting the time characteristic to event time and properly assigning timestamps/watermarks, time-based windows will be created based upon event time. If we need to process events within a window in event time order, we can sort the windowed values and process as necessary by applying a WindowFunction. However, as I understand it, there is no guarantee that time-based windows will be processed in time order. Is this correct? Or, if we assume a watermarking system that (for example's sake) does not allow any late events, is there a way within Flink to guarantee that windows will be processed (via an applied WindowFunction) in strictly increasing time order? If necessary, I can provide a more concrete explanation of what I mean/am looking for. Thanks! David |
Hi David, You are right, the events in the window are not sorted according to the EventTime hence the processing is not done in an increasing order of timestamp. As you said, you will have to do the sorting yourself in your window function to make sure that you are processing the events in order. What Flink does is (when EventTime is set and timestamp is assigned), it will assign the elements to the Windows based on the EventTime, which otherwise (if using ProcessingTime) might have ended up in a different Window. (as per the ProcessingTime). This is as per my limited knowledge, other Flink experts can correct me if this is wrong. Thanks, Vishnu On Wed, Jul 20, 2016 at 9:30 PM, David Desberg <[hidden email]> wrote: Hi all, |
Hi, If watermarks arriving from multiple sources, how long does the Event Time Trigger wait for the slower source to send its watermarks before triggering only from the faster source? I have seen that if one of the sources is really slow then the elements of the faster source fires and when the elements arrive from the slower source, the same window fires again with the new elements only. I can work around this by adding delays but does merging watermarks require that both have arrived by the time the watermarks progress to the point where a window can be triggered? Is applying a delay in the watermark the only way to solve this. Sameer Sent from my iPhone
|
Hi David, windows are being processed in order of their end timestamp. So if you specify an allowed lateness of zero (which will only be possible on Flink 1.1 or by using a custom trigger) you should be able to sort the elements. The ordering is only valid within one key, though, since windows for different keys with the same end timestamp will be processed in an arbitrary order. @Sameer If both sources emit watermarks that are correct for the elements that they are emitting the Trigger should only fire when both sources progressed their watermarks sufficiently far. Could you maybe give a more detailed example of the problem that you described? Cheers, Aljoscha On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar <[hidden email]> wrote:
|
Thanks, Aljoscha, This what I am seeing when I use Ascending timestamps as watermarks- Consider a window if 1-5 seconds Stream 1- Sends Elements A,B Stream 2 (20 seconds later) - Sends Elements C,D I see Window (1-5) fires first with just A,B. After 20 seconds Window (1-5) fires again but this time with only C,D. If I add a delay where I lag the watermarks by 20 seconds, then only one instance of the Window (1-5) fires with elements A,B,C,D. Sameer On Thu, Jul 21, 2016 at 5:17 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Yes, that is to be expected. Stream 2 should only send the watermark once the elements with a timestamp lower than the watermark have been sent as well. On Thu, 21 Jul 2016 at 13:10 Sameer W <[hidden email]> wrote:
|
Stream2 does send watermarks only after it sees elements C,D. It send the watermark (5) 20 seconds after Stream 1 sends it. From what I understand Flink merges watermarks from both streams on the Reduce side. But does it wait a certain pre-configured amount of time (for watermarks from both streams to arrive) before it finally fires the first stream. On Thu, Jul 21, 2016 at 7:52 AM, Aljoscha Krettek <[hidden email]> wrote:
|
In reply to this post by Aljoscha Krettek
Aljoscha, Awesome. Exactly the behavior I was hoping would be exhibited. Thank you for the quick answer :) Thanks, David On Thu, Jul 21, 2016 at 2:17 AM, Aljoscha Krettek <[hidden email]> wrote:
|
In reply to this post by Aljoscha Krettek
Alijoscha - Thanks it works exactly as you said. I found out why my windows were firing twice. I was making the error of adding the AutoWatermarkInterval to the existing watermark each time the watermark was sampled from the source just to fire a window if one of the sources was delayed substantially. But doesn't this mean, that if one of the sources stop sending data (device lost internet connectivity temporarily) , then such a pipeline would just freeze and windows would keep accumulating on the reduce side as other sources (except one) would keep sending data and their watermarks. Isn't this a risk for a possible Out of Memory Error. Should one always use a RocksDB alternative to mitigate such risks. Sameer On Thu, Jul 21, 2016 at 7:52 AM, Aljoscha Krettek <[hidden email]> wrote:
|
@Sameer, yes, if one source stops emitting watermarks then downstream operations will buffer data until the source starts updating the watermark again. If you can live with some data being late you could change the watermark logic in the source to start advancing the watermark if no new data is arriving for a while. This will make downstream operations continue processing but the elements that might arrive at the source in the future will then be considered late. The way you generate watermarks essentially allows to tune between correctness and latency. Cheers, Aljoscha On Thu, 21 Jul 2016 at 23:30 Sameer W <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |