Hi Guys,
Just wanted to get an idea on Why Flink decided to completely discard late elements in the latest version ?, this was not the case in 1.0.3 P.S In our case the data is critical so we cannot discard a single record even if it is late, I have written a custom trigger (as suggested by Aljoscha) to even accept late elements. Regards, Vinay Patil |
We have the same requirement - we cannot discard any data even if it arrives late. - LF From: Vinay Patil <[hidden email]> To: [hidden email] Sent: Sunday, October 2, 2016 8:21 PM Subject: Regarding Late Elements Hi Guys, Just wanted to get an idea on Why Flink decided to completely discard late elements in the latest version ?, this was not the case in 1.0.3 P.S In our case the data is critical so we cannot discard a single record even if it is late, I have written a custom trigger (as suggested by Aljoscha) to even accept late elements. Regards, Vinay Patil |
Hi LF, So did you manage to get the workaround for it ? I am using a Custom Trigger which is similar to 1.0.3 with few changes Regards, Vinay Patil On Mon, Oct 3, 2016 at 10:02 AM, lgfmt [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
Not yet.
I'm hoping a Flink export on this mailing list will reply. - LF From: vinay patil <[hidden email]> To: [hidden email] Sent: Monday, October 3, 2016 8:09 AM Subject: Re: Regarding Late Elements Hi LF, So did you manage to get the workaround for it ? I am using a Custom Trigger which is similar to 1.0.3 with few changes Regards, Vinay Patil On Mon, Oct 3, 2016 at 10:02 AM, lgfmt [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
View this message in context: Re: Regarding Late Elements Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hello LF and Vinay,
With the introduction of “allowed lateness” elements and windows are kept around until the watermark passes the window.maxTimestamp + allowed_lateness and then they are cleaned up (garbage collected) Every element that comes in and belongs to a window that is garbage collected is dropped as super-late. Elements that are late, but no more than the allowed lateness, they are kept the window fires as before. If you know what the maximum latency is, then the best way is to set the allowed lateness to that value. Currently Flink drops super-late elements and does not provide any mechanism to manually handle these elements, BUT there are discussions about adding such a functionality that will allow you to (probably) have a separate stream with only these elements. Thanks, Kostas
|
Hi Kostas,
Thank you for your reply, yes that will be a good functionality to have, but for now the Custom Trigger as close to 1.0.3 works for me.
Regards, Vinay Patil On Tue, Oct 4, 2016 at 3:58 AM, Kostas Kloudas [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hello LF and Vinay, |
Hi Vinay,
From what I understand from your code, the only difference of your trigger compared to the one shipping with Flink is that for the late elements, instead of firing and keeping the element, you fire and purge, i.e. clean the window state. This does not solve the problem of dropping the super late elements if their window has expired (currentWatermark >= window.maxTimestamp + allowedLateness). Cheers, Kostas
|
Hi Kostas, The late elements are immediately getting triggered with the code I have sent, I have tested it with a test case as follows : (I am doing the outer-join operation by doing the union of stream1 and stream2) 1. Push 5 records to Kafka Topic 1 -----> sourceStream1 2. Wait for few minutes -- (Now by this time sourceStream1 elements are triggered as non-matched elements) 3. Push the matching 5 records to Kafka Topic 2 ------> sourceStream2 (here the watermark has already reached ahead) Now according the custom trigger whenever onElement is called it immediately fires the window and purge it I have kept the onEventTime code same as 1.0.3, where it will fire and purge Am I doing something wrong ? (I have kept allowedLateness to Long.MAX_VALUE) Regards, Vinay Patil On Tue, Oct 4, 2016 at 9:58 AM, Kostas Kloudas [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi Vinay, |
Hi Vinay,
By setting the allowed_lateness to LongMax you are ok. Sorry I forgot that this was the default value. Just a note (although you have it right in your code), in this case you should always FIRE_AND_PURGE and not just FIRE. In other case your state will keep growing as it is never garbage collected. Cheers, Kostas
|
Hi Kostas, Yes you are right , I am always doing FIRE_AND_PURGE , if we don't do this and only use FIRE , the window function will get the elements in incremental fashion (1, 2,3..so on) I had observed this while testing. Can you please explain me the importance of canMerge and onMerge functions in trigger, from the javadocs I got it on a high level, but not able to understand it correctly (so I am not using these functions for now in Custom Trigger). Regards, Vinay Patil On Tue, Oct 4, 2016 at 10:51 AM, Kostas Kloudas [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi Vinay, |
Hi Vinay,
These methods are useful when using your trigger with SessionWindows. When using session windows, the state of a window and that of the corresponding trigger has to be merged with that of other windows. These methods do exactly that: the canMerge() says if the trigger can be used with Session Windows, and if yes, the onMerge() includes the logic of what to do with the trigger state when a window merging happens. As an example you can check out the CountTrigger. I hope this answers your question. Cheers, Kostas
|
Yes Kostas, thank you for the explanation , I will take a look Regards, Vinay Patil On Tue, Oct 4, 2016 at 11:23 AM, Kostas Kloudas [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi Vinay, |
Free forum by Nabble | Edit this page |