Hello,
I have a stream source of events. Each event is assigned a timestamp by the machine that generated the event and then those events are retreived by other machines (collectors). Finally those collectors will send the events to flink. In flink, when I receive those events I extract their timestamps and process them in a windowed fashion. The problem is that the event timestamps are unpredictable because the collectors can fail. When a collector fails and restarts it will keep sending the events that it didn't sent before , so those events can have a delay of many hours or days (depending on how much time the collector was down). I am trying to think of a way for processing those delayed events. As a first approach I could allow an arbitrary lateness (when assigning watermarks) and when an event arrives late I still can process it if it is within the max lateness. The problem is that the collectors are very unpredictable and I can't set an arbitrary lateness of several days because the memory consumption would keep growing. So I'm trying to figure out a way to recover the events when a collector stops and restarts. All the events that arrive to my flink job are stored in a persistent storage. So if a collector restarts, I can retrieve the events that belong to the same timewindow as the late events. The problem is that I need to keep processing those late events in the same way I would if they where arriving on time, but I don't know how can I do that with Flink or if its even possible. Depicted in the figure bellow is an an example of my use case. ![]() Events A,B,C,D,E,F,G arrive on time. Then the collector fails and when it restarts it sends the events H,I,J,K,L,M that where generated much earlier than the current time. Regards, Pedro Chaves
Best Regards,
Pedro Chaves |
Hi, in additional to what you mentioned (having a very large allowed lateness) you can also try another approach: adding a custom operator in front of the window operation and splitting the stream by normal elements and very late elements. Then, in the stream of very late elements you have some custom logic that tries and reads the result for the correct window from an external data store and incorporates the late element into that result. You would have to implement your own OneInputStreamOperator for this, however, because only there can you directly deal with timestamps and watermarks. Cheers, Aljoscha On Wed, 9 Nov 2016 at 17:57 PedroMrChaves <[hidden email]> wrote: Hello, |
Free forum by Nabble | Edit this page |