Combine two independant streams

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Combine two independant streams

Markus  Klein
Hello Flink Community,
 
I have a question regarding combining two independant streams. 
The first stream is a stream of events with metrics information. It occurs every 10 seconds. What I want is to join a second stream with events from an application. The result should be an event with the metrics and the events that happened the last 10 seconds.
 
So my first approacch was to generate an ID which will be increased after every metric event. This ID will be added to the application events and of cours for the current metric event. This works somehow good for live events but for recalculating past events the two streams have to start at the same point in event time.
 
The second approach was to generate a time window of 10 seconds for the application events and for the metrics and set the window end time as a key because flink windows end at e.g. 11:01:10, then 11:01:20 and so on. But this approach works only for past events because flink needs another application event to know that 10 seconds have passed for the application events window.
 
I hope you guys understand the problem. Is there a way two combine them in a nice way? I don't want to generate empty "heartbeats" for the application event stream.
 
Thanks for your help.
 
Greetings
Markus
Reply | Threaded
Open this post in threaded view
|

Re: Combine two independant streams

Fabian Hueske-2
Hi Markus,

I'm not sure I understood the issue with the second approach.
Is it that the stream of application events might be empty for some time such that its event time is not increasing?

Best, Fabian

2017-02-28 17:02 GMT+01:00 Markus Klein <[hidden email]>:
Hello Flink Community,
 
I have a question regarding combining two independant streams. 
The first stream is a stream of events with metrics information. It occurs every 10 seconds. What I want is to join a second stream with events from an application. The result should be an event with the metrics and the events that happened the last 10 seconds.
 
So my first approacch was to generate an ID which will be increased after every metric event. This ID will be added to the application events and of cours for the current metric event. This works somehow good for live events but for recalculating past events the two streams have to start at the same point in event time.
 
The second approach was to generate a time window of 10 seconds for the application events and for the metrics and set the window end time as a key because flink windows end at e.g. 11:01:10, then 11:01:20 and so on. But this approach works only for past events because flink needs another application event to know that 10 seconds have passed for the application events window.
 
I hope you guys understand the problem. Is there a way two combine them in a nice way? I don't want to generate empty "heartbeats" for the application event stream.
 
Thanks for your help.
 
Greetings
Markus

Reply | Threaded
Open this post in threaded view
|

Re: Combine two independant streams

Markus  Klein
Hi Fabian,

yeah, that's basically it. The events window gets closed only when a newer event arrives (after 10 seconds window).
Can I tell Flink to close the event window at timeWindow.getEnd() even if no newer event arrives?

Thanks,
Markus

Am 28.02.17 um 17:19 schrieb Fabian Hueske:
Hi Markus,

I'm not sure I understood the issue with the second approach.
Is it that the stream of application events might be empty for some time such that its event time is not increasing?

Best, Fabian

2017-02-28 17:02 GMT+01:00 Markus Klein <[hidden email]>:
Hello Flink Community,
 
I have a question regarding combining two independant streams. 
The first stream is a stream of events with metrics information. It occurs every 10 seconds. What I want is to join a second stream with events from an application. The result should be an event with the metrics and the events that happened the last 10 seconds.
 
So my first approacch was to generate an ID which will be increased after every metric event. This ID will be added to the application events and of cours for the current metric event. This works somehow good for live events but for recalculating past events the two streams have to start at the same point in event time.
 
The second approach was to generate a time window of 10 seconds for the application events and for the metrics and set the window end time as a key because flink windows end at e.g. 11:01:10, then 11:01:20 and so on. But this approach works only for past events because flink needs another application event to know that 10 seconds have passed for the application events window.
 
I hope you guys understand the problem. Is there a way two combine them in a nice way? I don't want to generate empty "heartbeats" for the application event stream.
 
Thanks for your help.
 
Greetings
Markus


Reply | Threaded
Open this post in threaded view
|

Re: Combine two independant streams

Fabian Hueske-2
In event-time mode, operators compute their internal time from watermarks.
Depending on how watermarks are generated, their time only increases if records with later timestamps are processed. If no records arrive, no new watermarks are generated and the event-time does not increase.

Since you want to reprocess offline data, you cannot use processing time which uses the wall clock time of the processing machines.
Instead you could use a custom periodic watermark that slowly increases the time even if no new data arrives. However, you should be careful, because this could also lead to late arriving events being dropped. The allowedLateness parameter can help to mitigate the problem.

Hope that helps,
Fabian

2017-02-28 18:50 GMT+01:00 Markus <[hidden email]>:
Hi Fabian,

yeah, that's basically it. The events window gets closed only when a newer event arrives (after 10 seconds window).
Can I tell Flink to close the event window at timeWindow.getEnd() even if no newer event arrives?

Thanks,
Markus

Am 28.02.17 um 17:19 schrieb Fabian Hueske:
Hi Markus,

I'm not sure I understood the issue with the second approach.
Is it that the stream of application events might be empty for some time such that its event time is not increasing?

Best, Fabian

2017-02-28 17:02 GMT+01:00 Markus Klein <[hidden email]>:
Hello Flink Community,
 
I have a question regarding combining two independant streams. 
The first stream is a stream of events with metrics information. It occurs every 10 seconds. What I want is to join a second stream with events from an application. The result should be an event with the metrics and the events that happened the last 10 seconds.
 
So my first approacch was to generate an ID which will be increased after every metric event. This ID will be added to the application events and of cours for the current metric event. This works somehow good for live events but for recalculating past events the two streams have to start at the same point in event time.
 
The second approach was to generate a time window of 10 seconds for the application events and for the metrics and set the window end time as a key because flink windows end at e.g. 11:01:10, then 11:01:20 and so on. But this approach works only for past events because flink needs another application event to know that 10 seconds have passed for the application events window.
 
I hope you guys understand the problem. Is there a way two combine them in a nice way? I don't want to generate empty "heartbeats" for the application event stream.
 
Thanks for your help.
 
Greetings
Markus



Reply | Threaded
Open this post in threaded view
|

Aw: Re: Combine two independant streams

Markus  Klein
Hi Fabian,
 
thanks for your very goog explanation. However, I don't exactly know how to increase the watermark by myself. Do you have an example for me? Do I have to override the getCurrentWatermark method?
 
Thanks,
Markus
 
Gesendet: Dienstag, 28. Februar 2017 um 20:36 Uhr
Von: "Fabian Hueske" <[hidden email]>
An: [hidden email]
Betreff: Re: Combine two independant streams
In event-time mode, operators compute their internal time from watermarks.
Depending on how watermarks are generated, their time only increases if records with later timestamps are processed. If no records arrive, no new watermarks are generated and the event-time does not increase.

Since you want to reprocess offline data, you cannot use processing time which uses the wall clock time of the processing machines.
Instead you could use a custom periodic watermark that slowly increases the time even if no new data arrives. However, you should be careful, because this could also lead to late arriving events being dropped. The allowedLateness parameter can help to mitigate the problem.
 
Hope that helps,
Fabian
 
2017-02-28 18:50 GMT+01:00 Markus <[hidden email]>:
Hi Fabian,

yeah, that's basically it. The events window gets closed only when a newer event arrives (after 10 seconds window).
Can I tell Flink to close the event window at timeWindow.getEnd() even if no newer event arrives?

Thanks,
Markus

Am 28.02.17 um 17:19 schrieb Fabian Hueske:
Hi Markus,
 
I'm not sure I understood the issue with the second approach.
Is it that the stream of application events might be empty for some time such that its event time is not increasing?

Best, Fabian
 
2017-02-28 17:02 GMT+01:00 Markus Klein <[hidden email]>:
Hello Flink Community,
 
I have a question regarding combining two independant streams. 
The first stream is a stream of events with metrics information. It occurs every 10 seconds. What I want is to join a second stream with events from an application. The result should be an event with the metrics and the events that happened the last 10 seconds.
 
So my first approacch was to generate an ID which will be increased after every metric event. This ID will be added to the application events and of cours for the current metric event. This works somehow good for live events but for recalculating past events the two streams have to start at the same point in event time.
 
The second approach was to generate a time window of 10 seconds for the application events and for the metrics and set the window end time as a key because flink windows end at e.g. 11:01:10, then 11:01:20 and so on. But this approach works only for past events because flink needs another application event to know that 10 seconds have passed for the application events window.
 
I hope you guys understand the problem. Is there a way two combine them in a nice way? I don't want to generate empty "heartbeats" for the application event stream.
 
Thanks for your help.
 
Greetings
Markus

 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Combine two independant streams

Fabian Hueske-2
In a periodic watermark assigner, the getCurrentWatermark method is called in regular intervals (configurable via the ExecutionEnvironment).
You could implement the method in a way that it increases the watermark if it was not increased by record timestamps after a certain number of invocations.
Once a new record is received, this should be used to generate watermarks.

However, you need to know your data quite well and would only help to overcome small gaps of inactivity. I don't think it would help with larger gaps.
You should keep in mind that you do not want to increase the watermark too much because this can result in late data which might be dropped.

Best, Fabian

2017-03-01 8:35 GMT+01:00 Markus Klein <[hidden email]>:
Hi Fabian,
 
thanks for your very goog explanation. However, I don't exactly know how to increase the watermark by myself. Do you have an example for me? Do I have to override the getCurrentWatermark method?
 
Thanks,
Markus
 
Gesendet: Dienstag, 28. Februar 2017 um 20:36 Uhr
Von: "Fabian Hueske" <[hidden email]>
An: [hidden email]
Betreff: Re: Combine two independant streams
In event-time mode, operators compute their internal time from watermarks.
Depending on how watermarks are generated, their time only increases if records with later timestamps are processed. If no records arrive, no new watermarks are generated and the event-time does not increase.

Since you want to reprocess offline data, you cannot use processing time which uses the wall clock time of the processing machines.
Instead you could use a custom periodic watermark that slowly increases the time even if no new data arrives. However, you should be careful, because this could also lead to late arriving events being dropped. The allowedLateness parameter can help to mitigate the problem.
 
Hope that helps,
Fabian
 
2017-02-28 18:50 GMT+01:00 Markus <[hidden email]>:
Hi Fabian,

yeah, that's basically it. The events window gets closed only when a newer event arrives (after 10 seconds window).
Can I tell Flink to close the event window at timeWindow.getEnd() even if no newer event arrives?

Thanks,
Markus

Am 28.02.17 um 17:19 schrieb Fabian Hueske:
Hi Markus,
 
I'm not sure I understood the issue with the second approach.
Is it that the stream of application events might be empty for some time such that its event time is not increasing?

Best, Fabian
 
2017-02-28 17:02 GMT+01:00 Markus Klein <[hidden email]>:
Hello Flink Community,
 
I have a question regarding combining two independant streams. 
The first stream is a stream of events with metrics information. It occurs every 10 seconds. What I want is to join a second stream with events from an application. The result should be an event with the metrics and the events that happened the last 10 seconds.
 
So my first approacch was to generate an ID which will be increased after every metric event. This ID will be added to the application events and of cours for the current metric event. This works somehow good for live events but for recalculating past events the two streams have to start at the same point in event time.
 
The second approach was to generate a time window of 10 seconds for the application events and for the metrics and set the window end time as a key because flink windows end at e.g. 11:01:10, then 11:01:20 and so on. But this approach works only for past events because flink needs another application event to know that 10 seconds have passed for the application events window.
 
I hope you guys understand the problem. Is there a way two combine them in a nice way? I don't want to generate empty "heartbeats" for the application event stream.
 
Thanks for your help.
 
Greetings
Markus