Problem with ProcessFunction timeout feature

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem with ProcessFunction timeout feature

Álvaro Vilaplana García
Hi,

Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.


a) We have devices, each with its own ID, which we don’t have control of

b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp

c) We want to detect when each devices may stop sending messages

d) For that, we are using a ProcessFunction

e) The devices put the messages in a Kafka topic, partitioned by ID.

f) We are struggling with the ProcessFunction timeout feature:

We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead. 
In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.

As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.

I am using Scala 2.11 /Flink versions 1.2.0

Regards
--
______________________________

Álvaro Vilaplana García
Reply | Threaded
Open this post in threaded view
|

Re: Problem with ProcessFunction timeout feature

Stefan Richter
Hi,

if I understand correctly, your problem is that event time does not progress in case you don’t receive events, so you cannot detect the timeout of devices. Would it make sense to have you source periodically send artificial events to advance the watermark in the absence of device events, with a certain gap for which you can safely assume that you will no longer receive events with a smaller timestamp from any device in the future? Because, how else could Flink advance event time without receiving further events?

Best,
Stefan

> Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <[hidden email]>:
>
> Hi,
>
> Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.
>
>
> a) We have devices, each with its own ID, which we don’t have control of
>
> b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp
>
> c) We want to detect when each devices may stop sending messages
>
> d) For that, we are using a ProcessFunction
>
> e) The devices put the messages in a Kafka topic, partitioned by ID.
>
> f) We are struggling with the ProcessFunction timeout feature:
>
> We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead.
> In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.
>
> As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.
>
> I am using Scala 2.11 /Flink versions 1.2.0
>
> Regards
> --
> ______________________________
>
> Álvaro Vilaplana García

Reply | Threaded
Open this post in threaded view
|

Re: Problem with ProcessFunction timeout feature

Álvaro Vilaplana García
Hi Stefan,

Thank you so much for your answer.

Regarding the 'artificial events', our main problem is that we have no control at all in the devices.

I have been reading more about event time and watermarks and what I understood is that when we use event times (device times) Flink does not know anything about notion of time and the watermark is a way to help Flink to set the time of the stream (no more events with event time earlier than the watermark). That would explain that we need always an event to set the watermark. Does it make sense?


I understood that the watermarks will be per partition (ByKey(deviceId)), is that right?  


Cheers

2017-06-22 16:26 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

if I understand correctly, your problem is that event time does not progress in case you don’t receive events, so you cannot detect the timeout of devices. Would it make sense to have you source periodically send artificial events to advance the watermark in the absence of device events, with a certain gap for which you can safely assume that you will no longer receive events with a smaller timestamp from any device in the future? Because, how else could Flink advance event time without receiving further events?

Best,
Stefan

> Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <[hidden email]>:
>
> Hi,
>
> Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.
>
>
> a) We have devices, each with its own ID, which we don’t have control of
>
> b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp
>
> c) We want to detect when each devices may stop sending messages
>
> d) For that, we are using a ProcessFunction
>
> e) The devices put the messages in a Kafka topic, partitioned by ID.
>
> f) We are struggling with the ProcessFunction timeout feature:
>
> We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead.
> In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.
>
> As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.
>
> I am using Scala 2.11 /Flink versions 1.2.0
>
> Regards
> --
> ______________________________
>
> Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García
Reply | Threaded
Open this post in threaded view
|

Re: Problem with ProcessFunction timeout feature

Stefan Richter
Hi,

yes, I think you understood the basic concept of watermarks. Events are basically driving „the event time clock“, so it can only advance when you see events. I am not sure if I got the part about partitions correctly, but the watermark event time is a global thing. For example, if you have multiple Kafka partitions that your source reads, each partition can have a different current watermark. However, the source must determine the current event time of the stream, e.g. as the minimum of the watermarks from all the Kafka partition it reads.

One thing that might help for your use case is a combination of event time and processing time. In the processing function, after each device event, you could register a timer so far ahead in processing time that it can serve as a signal to check for time outs because you did not receive events in a long time.

Best,
Stefan

Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <[hidden email]>:

Hi Stefan,

Thank you so much for your answer.

Regarding the 'artificial events', our main problem is that we have no control at all in the devices.

I have been reading more about event time and watermarks and what I understood is that when we use event times (device times) Flink does not know anything about notion of time and the watermark is a way to help Flink to set the time of the stream (no more events with event time earlier than the watermark). That would explain that we need always an event to set the watermark. Does it make sense?


I understood that the watermarks will be per partition (ByKey(deviceId)), is that right?  


Cheers

2017-06-22 16:26 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

if I understand correctly, your problem is that event time does not progress in case you don’t receive events, so you cannot detect the timeout of devices. Would it make sense to have you source periodically send artificial events to advance the watermark in the absence of device events, with a certain gap for which you can safely assume that you will no longer receive events with a smaller timestamp from any device in the future? Because, how else could Flink advance event time without receiving further events?

Best,
Stefan

> Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <[hidden email]>:
>
> Hi,
>
> Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.
>
>
> a) We have devices, each with its own ID, which we don’t have control of
>
> b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp
>
> c) We want to detect when each devices may stop sending messages
>
> d) For that, we are using a ProcessFunction
>
> e) The devices put the messages in a Kafka topic, partitioned by ID.
>
> f) We are struggling with the ProcessFunction timeout feature:
>
> We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead.
> In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.
>
> As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.
>
> I am using Scala 2.11 /Flink versions 1.2.0
>
> Regards
> --
> ______________________________
>
> Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García

Reply | Threaded
Open this post in threaded view
|

Re: Problem with ProcessFunction timeout feature

Álvaro Vilaplana García
Hi Stefan,

Thank you for your knowledge, very appreciated.

According with the documentation:

void registerEventTimeTimer(long time); -> 'Registers a timer to be fired when the event time watermark passes the given time.' 

Dont we have the same problem? We would need an event (that event does not come soon) to set the watermark and trigger the timer.

Or there is another way of setting the watermark based on the processing time instead of the event time?

Cheers

2017-06-23 9:24 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

yes, I think you understood the basic concept of watermarks. Events are basically driving „the event time clock“, so it can only advance when you see events. I am not sure if I got the part about partitions correctly, but the watermark event time is a global thing. For example, if you have multiple Kafka partitions that your source reads, each partition can have a different current watermark. However, the source must determine the current event time of the stream, e.g. as the minimum of the watermarks from all the Kafka partition it reads.

One thing that might help for your use case is a combination of event time and processing time. In the processing function, after each device event, you could register a timer so far ahead in processing time that it can serve as a signal to check for time outs because you did not receive events in a long time.

Best,
Stefan

Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <[hidden email]>:

Hi Stefan,

Thank you so much for your answer.

Regarding the 'artificial events', our main problem is that we have no control at all in the devices.

I have been reading more about event time and watermarks and what I understood is that when we use event times (device times) Flink does not know anything about notion of time and the watermark is a way to help Flink to set the time of the stream (no more events with event time earlier than the watermark). That would explain that we need always an event to set the watermark. Does it make sense?


I understood that the watermarks will be per partition (ByKey(deviceId)), is that right?  


Cheers

2017-06-22 16:26 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

if I understand correctly, your problem is that event time does not progress in case you don’t receive events, so you cannot detect the timeout of devices. Would it make sense to have you source periodically send artificial events to advance the watermark in the absence of device events, with a certain gap for which you can safely assume that you will no longer receive events with a smaller timestamp from any device in the future? Because, how else could Flink advance event time without receiving further events?

Best,
Stefan

> Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <[hidden email]>:
>
> Hi,
>
> Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.
>
>
> a) We have devices, each with its own ID, which we don’t have control of
>
> b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp
>
> c) We want to detect when each devices may stop sending messages
>
> d) For that, we are using a ProcessFunction
>
> e) The devices put the messages in a Kafka topic, partitioned by ID.
>
> f) We are struggling with the ProcessFunction timeout feature:
>
> We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead.
> In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.
>
> As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.
>
> I am using Scala 2.11 /Flink versions 1.2.0
>
> Regards
> --
> ______________________________
>
> Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García
Reply | Threaded
Open this post in threaded view
|

Re: Problem with ProcessFunction timeout feature

Álvaro Vilaplana García
Hi Stefan,

You meant 

/**
* Registers a timer to be fired when processing time passes the given time.
*
* <p>Timers can internally be scoped to keys and/or windows. When you set a timer
* in a keyed context, such as in an operation on
* {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
* will also be active when you receive the timer notification.
*/
void registerProcessingTimeTimer(long time);

Am i right?

Cheers

2017-06-23 9:51 GMT+01:00 Álvaro Vilaplana García <[hidden email]>:
Hi Stefan,

Thank you for your knowledge, very appreciated.

According with the documentation:

void registerEventTimeTimer(long time); -> 'Registers a timer to be fired when the event time watermark passes the given time.' 

Dont we have the same problem? We would need an event (that event does not come soon) to set the watermark and trigger the timer.

Or there is another way of setting the watermark based on the processing time instead of the event time?

Cheers

2017-06-23 9:24 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

yes, I think you understood the basic concept of watermarks. Events are basically driving „the event time clock“, so it can only advance when you see events. I am not sure if I got the part about partitions correctly, but the watermark event time is a global thing. For example, if you have multiple Kafka partitions that your source reads, each partition can have a different current watermark. However, the source must determine the current event time of the stream, e.g. as the minimum of the watermarks from all the Kafka partition it reads.

One thing that might help for your use case is a combination of event time and processing time. In the processing function, after each device event, you could register a timer so far ahead in processing time that it can serve as a signal to check for time outs because you did not receive events in a long time.

Best,
Stefan

Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <[hidden email]>:

Hi Stefan,

Thank you so much for your answer.

Regarding the 'artificial events', our main problem is that we have no control at all in the devices.

I have been reading more about event time and watermarks and what I understood is that when we use event times (device times) Flink does not know anything about notion of time and the watermark is a way to help Flink to set the time of the stream (no more events with event time earlier than the watermark). That would explain that we need always an event to set the watermark. Does it make sense?


I understood that the watermarks will be per partition (ByKey(deviceId)), is that right?  


Cheers

2017-06-22 16:26 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

if I understand correctly, your problem is that event time does not progress in case you don’t receive events, so you cannot detect the timeout of devices. Would it make sense to have you source periodically send artificial events to advance the watermark in the absence of device events, with a certain gap for which you can safely assume that you will no longer receive events with a smaller timestamp from any device in the future? Because, how else could Flink advance event time without receiving further events?

Best,
Stefan

> Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <[hidden email]>:
>
> Hi,
>
> Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.
>
>
> a) We have devices, each with its own ID, which we don’t have control of
>
> b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp
>
> c) We want to detect when each devices may stop sending messages
>
> d) For that, we are using a ProcessFunction
>
> e) The devices put the messages in a Kafka topic, partitioned by ID.
>
> f) We are struggling with the ProcessFunction timeout feature:
>
> We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead.
> In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.
>
> As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.
>
> I am using Scala 2.11 /Flink versions 1.2.0
>
> Regards
> --
> ______________________________
>
> Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García



--
______________________________

Álvaro Vilaplana García
Reply | Threaded
Open this post in threaded view
|

Re: Problem with ProcessFunction timeout feature

Stefan Richter
Yes, exactly. The idea would be, that you operate in event time, but combine it with processing time timers to trigger timeout detection. Could that help for your case?

Am 23.06.2017 um 10:55 schrieb Álvaro Vilaplana García <[hidden email]>:

Hi Stefan,

You meant 

/**
* Registers a timer to be fired when processing time passes the given time.
*
* <p>Timers can internally be scoped to keys and/or windows. When you set a timer
* in a keyed context, such as in an operation on
* {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
* will also be active when you receive the timer notification.
*/
void registerProcessingTimeTimer(long time);

Am i right?

Cheers

2017-06-23 9:51 GMT+01:00 Álvaro Vilaplana García <[hidden email]>:
Hi Stefan,

Thank you for your knowledge, very appreciated.

According with the documentation:

void registerEventTimeTimer(long time); -> 'Registers a timer to be fired when the event time watermark passes the given time.' 

Dont we have the same problem? We would need an event (that event does not come soon) to set the watermark and trigger the timer.

Or there is another way of setting the watermark based on the processing time instead of the event time?

Cheers

2017-06-23 9:24 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

yes, I think you understood the basic concept of watermarks. Events are basically driving „the event time clock“, so it can only advance when you see events. I am not sure if I got the part about partitions correctly, but the watermark event time is a global thing. For example, if you have multiple Kafka partitions that your source reads, each partition can have a different current watermark. However, the source must determine the current event time of the stream, e.g. as the minimum of the watermarks from all the Kafka partition it reads.

One thing that might help for your use case is a combination of event time and processing time. In the processing function, after each device event, you could register a timer so far ahead in processing time that it can serve as a signal to check for time outs because you did not receive events in a long time.

Best,
Stefan

Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <[hidden email]>:

Hi Stefan,

Thank you so much for your answer.

Regarding the 'artificial events', our main problem is that we have no control at all in the devices.

I have been reading more about event time and watermarks and what I understood is that when we use event times (device times) Flink does not know anything about notion of time and the watermark is a way to help Flink to set the time of the stream (no more events with event time earlier than the watermark). That would explain that we need always an event to set the watermark. Does it make sense?


I understood that the watermarks will be per partition (ByKey(deviceId)), is that right?  


Cheers

2017-06-22 16:26 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

if I understand correctly, your problem is that event time does not progress in case you don’t receive events, so you cannot detect the timeout of devices. Would it make sense to have you source periodically send artificial events to advance the watermark in the absence of device events, with a certain gap for which you can safely assume that you will no longer receive events with a smaller timestamp from any device in the future? Because, how else could Flink advance event time without receiving further events?

Best,
Stefan

> Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <[hidden email]>:
>
> Hi,
>
> Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.
>
>
> a) We have devices, each with its own ID, which we don’t have control of
>
> b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp
>
> c) We want to detect when each devices may stop sending messages
>
> d) For that, we are using a ProcessFunction
>
> e) The devices put the messages in a Kafka topic, partitioned by ID.
>
> f) We are struggling with the ProcessFunction timeout feature:
>
> We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead.
> In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.
>
> As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.
>
> I am using Scala 2.11 /Flink versions 1.2.0
>
> Regards
> --
> ______________________________
>
> Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García



--
______________________________

Álvaro Vilaplana García

Reply | Threaded
Open this post in threaded view
|

Re: Problem with ProcessFunction timeout feature

Álvaro Vilaplana García
Well, it sounds very reasonable to me!

I will let you know how it goes.


2017-06-23 10:05 GMT+01:00 Stefan Richter <[hidden email]>:
Yes, exactly. The idea would be, that you operate in event time, but combine it with processing time timers to trigger timeout detection. Could that help for your case?

Am 23.06.2017 um 10:55 schrieb Álvaro Vilaplana García <[hidden email]>:

Hi Stefan,

You meant 

/**
* Registers a timer to be fired when processing time passes the given time.
*
* <p>Timers can internally be scoped to keys and/or windows. When you set a timer
* in a keyed context, such as in an operation on
* {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
* will also be active when you receive the timer notification.
*/
void registerProcessingTimeTimer(long time);

Am i right?

Cheers

2017-06-23 9:51 GMT+01:00 Álvaro Vilaplana García <[hidden email]>:
Hi Stefan,

Thank you for your knowledge, very appreciated.

According with the documentation:

void registerEventTimeTimer(long time); -> 'Registers a timer to be fired when the event time watermark passes the given time.' 

Dont we have the same problem? We would need an event (that event does not come soon) to set the watermark and trigger the timer.

Or there is another way of setting the watermark based on the processing time instead of the event time?

Cheers

2017-06-23 9:24 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

yes, I think you understood the basic concept of watermarks. Events are basically driving „the event time clock“, so it can only advance when you see events. I am not sure if I got the part about partitions correctly, but the watermark event time is a global thing. For example, if you have multiple Kafka partitions that your source reads, each partition can have a different current watermark. However, the source must determine the current event time of the stream, e.g. as the minimum of the watermarks from all the Kafka partition it reads.

One thing that might help for your use case is a combination of event time and processing time. In the processing function, after each device event, you could register a timer so far ahead in processing time that it can serve as a signal to check for time outs because you did not receive events in a long time.

Best,
Stefan

Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <[hidden email]>:

Hi Stefan,

Thank you so much for your answer.

Regarding the 'artificial events', our main problem is that we have no control at all in the devices.

I have been reading more about event time and watermarks and what I understood is that when we use event times (device times) Flink does not know anything about notion of time and the watermark is a way to help Flink to set the time of the stream (no more events with event time earlier than the watermark). That would explain that we need always an event to set the watermark. Does it make sense?


I understood that the watermarks will be per partition (ByKey(deviceId)), is that right?  


Cheers

2017-06-22 16:26 GMT+01:00 Stefan Richter <[hidden email]>:
Hi,

if I understand correctly, your problem is that event time does not progress in case you don’t receive events, so you cannot detect the timeout of devices. Would it make sense to have you source periodically send artificial events to advance the watermark in the absence of device events, with a certain gap for which you can safely assume that you will no longer receive events with a smaller timestamp from any device in the future? Because, how else could Flink advance event time without receiving further events?

Best,
Stefan

> Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <[hidden email]>:
>
> Hi,
>
> Please, can you help me with a problem? I summarise in the next points, I hope is enough clear to approach some help.
>
>
> a) We have devices, each with its own ID, which we don’t have control of
>
> b) These devices send messages, with an internally generated, non-synced (amongst other devices) timestamp
>
> c) We want to detect when each devices may stop sending messages
>
> d) For that, we are using a ProcessFunction
>
> e) The devices put the messages in a Kafka topic, partitioned by ID.
>
> f) We are struggling with the ProcessFunction timeout feature:
>
> We cannot rely on real time (processing time), since the messages from the devices may be delayed (even if their timestamp does not show these delays) - so we rely on device timestamps instead.
> In our case an event comes in which: "Registers a timer to be fired when the event time watermark passes the given time". The problem we have is there are cases where we do not get an additional event after the first event- which means that the original event timeouts are not triggered.
>
> As a side note we've seen in unit tests that flink seems to set a watermark after the last event with a Long.MaxValue (9223372036854775807) - which hides the above problem.
>
> I am using Scala 2.11 /Flink versions 1.2.0
>
> Regards
> --
> ______________________________
>
> Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García



--
______________________________

Álvaro Vilaplana García




--
______________________________

Álvaro Vilaplana García