Processing streams of events with unpredictable delays

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Processing streams of events with unpredictable delays

PedroMrChaves
Hello,

I have a stream source of events. Each event is assigned a timestamp by the machine that generated the event and then those events are retreived by other machines (collectors). Finally those collectors will send the events to flink. In flink, when I receive those events I extract their timestamps and process them in a windowed fashion.

The problem is that the event timestamps are unpredictable because the collectors can fail. When a collector fails and restarts it will keep sending the events that it didn't sent before , so those events can have a delay of many hours or days (depending on how much time the collector was down).

I am trying to think of a way for processing those delayed events. As a first approach I could allow an arbitrary lateness (when assigning watermarks) and when an event arrives late I still can process it if it is within the max lateness. The problem is that the collectors are very unpredictable and I can't set an arbitrary lateness of several days because the memory consumption would keep growing.

So I'm trying to figure out a way to recover the events when a collector stops and restarts. All the events that arrive to my flink job are stored in a persistent storage. So if a collector restarts, I can retrieve the events that belong to the same timewindow as the late events. The problem is that I need to keep processing those late events in the same way I would if they where arriving on time, but I don't know how can I do that with Flink or if its even possible.

Depicted in the figure bellow is an an example of my use case.



Events A,B,C,D,E,F,G arrive on time. Then the collector fails and when it restarts it sends
the events H,I,J,K,L,M  that where generated much earlier than the current time.

Regards,
Pedro Chaves
Best Regards,
Pedro Chaves
Reply | Threaded
Open this post in threaded view
|

Re: Processing streams of events with unpredictable delays

Aljoscha Krettek
Hi,
in additional to what you mentioned (having a very large allowed lateness) you can also try another approach: adding a custom operator in front of the window operation and splitting the stream by normal elements and very late elements. Then, in the stream of very late elements you have some custom logic that tries and reads the result for the correct window from an external data store and incorporates the late element into that result. You would have to implement your own OneInputStreamOperator for this, however, because only there can you directly deal with timestamps and watermarks.

Cheers,
Aljoscha

On Wed, 9 Nov 2016 at 17:57 PedroMrChaves <[hidden email]> wrote:
Hello,

I have a stream source of events. Each event is assigned a timestamp by the
machine that generated the event and then those events are retreived by
other machines (collectors). Finally those collectors will send the events
to flink. In flink, when I receive those events I extract their timestamps
and process them in a windowed fashion.

The problem is that the event timestamps are unpredictable because the
collectors can fail. When a collector fails and restarts it will keep
sending the events that it didn't sent before , so those events can have a
delay of many hours or days (depending on how much time the collector was
down).

I am trying to think of a way for processing those delayed events. As a
first approach I could allow an arbitrary lateness (when assigning
watermarks) and when an event arrives late I still can process it if it is
within the max lateness. The problem is that the collectors are very
unpredictable and I can't set an arbitrary lateness of several days because
the memory consumption would keep growing.

So I'm trying to figure out a way to recover the events when a collector
stops and restarts. All the events that arrive to my flink job are stored in
a persistent storage. So if a collector restarts, I can retrieve the events
that belong to the same timewindow as the late events. The problem is that I
need to keep processing those late events in the same way I would if they
where arriving on time, but I don't know how can I do that with Flink or if
its even possible.

Depicted in the figure bellow is an an example of my use case.

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10016/EventLatenessProblem.png>

Events A,B,C,D,E,F,G arrive on time. Then the collector fails and when it
restarts it sends
the events H,I,J,K,L,M  that where generated much earlier than the current
time.

Regards,
Pedro Chaves



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-streams-of-events-with-unpredictable-delays-tp10016.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.