Hi,
I have a bunch of devices that keep sending heartbeat messages. I want to make an operator that emits messages when a device disconnects and when a device stops being disconnected. A device is considered disconnected if we don't receive any heartbeat for more than some TIMEOUT duration. This seemed like a good candidate for session windows, but I am not sure how I can express the inverse logic (i.e. detecting periods of inactivity instead of activity) using Flink's operators. I want to use event time for all processing and ideally want to achieve this behaviour using a single operator. So I am trying to implement a custom processfunction that, on every heartbeat:
The basic idea is that every new heartbeat will keep pushing the timer forward. Only when heartbeats stop arriving does the timer fire, indicating the start of a disconnected state. Code: public class IUDisconnectedStateDetector extends KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {
However, the above code does not behave as expected - the timer fires even when (a) it has received heartbeats within the timeout and (b) I have the code to delete it. So, my questions:
Thanks! |
Hi Manas,
at the first glance your code looks correct to me. I would investigate if your keys and watermarks are correct. Esp. the watermark frequency could be an issue. If watermarks are generated at the same time as the heartbeats itself, it might be the case that the timers fire first before the process() function is called which resets the timer. Maybe you can give us more information how watermarks are generated? Regards, Timo On 11.08.20 08:33, Manas Kale wrote: > Hi, > I have a bunch of devices that keep sending heartbeat messages. I want > to make an operator that emits messages when a device disconnects and > when a device stops being disconnected. > A device is considered disconnected if we don't receive any heartbeat > for more than some TIMEOUT duration. > This seemed like a good candidate for session windows, but I am not sure > how I can express the inverse logic (i.e. detecting periods of > inactivity instead of activity) using Flink's operators. > I want to use event time for all processing and ideally want to achieve > this behaviour using a single operator. > > So I am trying to implement a custom processfunction that, on every > heartbeat: > > * Deletes any previous event time timer > * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT > > The basic idea is that every new heartbeat will keep pushing the timer > forward. Only when heartbeats stop arriving does the timer fire, > indicating the start of a disconnected state. > Code: > > public class IUDisconnectedStateDetectorextends KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> { > > // Tracks if this monitor is disconnected or not. > private ValueState<Boolean>isDisconnectedStateStore; > // Tracks which timer was registered. > private ValueState<Long>registeredTimerStateStore; > > private final LoggerLOGGER = LoggerFactory.getLogger(IUDisconnectedStateDetector.class); > > // Called by the Flink runtime before starting this operator. We > initialize the state stores here. > @Override > public void open(Configuration parameters)throws Exception { > isDisconnectedStateStore = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>( > DISCONNECTED_STATE_STORE_NAME, Boolean.class)); > registeredTimerStateStore = getRuntimeContext().getState(new ValueStateDescriptor<Long>( > REGISTERED_TIMER_STATE_STORE_NAME, Long.class)); > } > > @Override > public void processElement(IUHeartbeat heartbeat, Context ctx, Collector<IUSessionMessage> out)throws Exception { > Boolean isDisconnected =isDisconnectedStateStore.value(); > LOGGER.info("Watermark: " + heartbeat +", isDisconnected : " + isDisconnected > +" last registered timer :" +registeredTimerStateStore.value()); > > > // If this is the first message for this monitor or is the first message > after a disconnection. > if (isDisconnected ==null || isDisconnected == Boolean.TRUE) { > // Delete previous timer. > if (registeredTimerStateStore.value() !=null) > ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value()); > > // Register a timer that will fire in the future if no further events > are received. > long timerFiringTimestamp = heartbeat.getTimestamp() +DISCONNECTED_TIMEOUT; > ctx.timerService().registerEventTimeTimer(timerFiringTimestamp); > registeredTimerStateStore.update(timerFiringTimestamp); > > // Emit a message indicating END of the disconnected state. > IUSessionMessage message =new IUSessionMessage( > new IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"), > new IUSessionInfo(heartbeat.getTimestamp(), IUStatus.ENDED, IUEventType.NO_VALUE)); > out.collect(message); > LOGGER.info(message.getSessionInfo().toString()); > // Update the state store. > isDisconnectedStateStore.update(Boolean.FALSE); > } > } > > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, Collector<IUSessionMessage> out)throws Exception { > if (isDisconnectedStateStore.value() == Boolean.FALSE) { > // If this timer fires that means no message was received from the > monitor for some timeout duration. > // Update the state store. > isDisconnectedStateStore.update(Boolean.TRUE); > > // Emit a message indicating START of the disconnected state. Note that > since this is applicable for a monitor, > IUSessionMessage message =new IUSessionMessage( > new IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"), > new IUSessionInfo(timestamp, IUStatus.STARTED, IUEventType.NO_VALUE)); > out.collect(message); > > LOGGER.info(message.getSessionInfo().toString()); > } > } > } > > *However, the above code does not behave as expected - the timer fires > even when (a) it has received heartbeats within the timeout and (b) I > have the code to delete it*. So, my questions: > > * Am I deleting the timer incorrectly? I use a state store to keep > track of registered timer's timestamps and use that value when deleting. > * Am I overcomplicating things? Can this be achieved using Flink's > inbuild session windowing operators? > > Thanks! |
Hi Timo, I got it, the issue was a (silly) mistake on my part. I unnecessarily put all the processElement() logic inside the if condition. The if() condition is there because I want to emit a disconnected STOPPED message only once. So the correct code is : @Override This produces the expected output. Also, I will assume that this is the best way to solve my problem - I can't use Flink's session windows. Let me know if anyone has any other ideas though! Thank you for your time and quick response! On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <[hidden email]> wrote: Hi Manas, |
Sometimes it's not easy to spot the obvious ;-)
Great that it works now. Let us know if you have further questions. Regards, Timo On 11.08.20 10:51, Manas Kale wrote: > Hi Timo, > I got it, the issue was a (silly) mistake on my part. I unnecessarily > put all the processElement() logic inside the if condition. The if() > condition is there because I want to emit a disconnected STOPPED message > only once. > So the correct code is : > > @Override > public void processElement(IUHeartbeat heartbeat, Context ctx, Collector<IUSessionMessage> out)throws Exception { > Boolean isDisconnected =isDisconnectedStateStore.value(); > // LOGGER.info("Watermark: " + ctx.timerService().currentWatermark() + " > Processing timestamp : "+ heartbeat.getTimestamp() + ", isDisconnected : > " + isDisconnected > // +" last registered timer :" + registeredTimerStateStore.value()); > > // Delete previous timer. > if (registeredTimerStateStore.value() !=null) > ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value()); > > // Register a timer that will fire in the future if no further events > are received. > long timerFiringTimestamp = heartbeat.getTimestamp() +DISCONNECTED_TIMEOUT; > ctx.timerService().registerEventTimeTimer(timerFiringTimestamp); > registeredTimerStateStore.update(timerFiringTimestamp); > > // If this is the first message for this monitor or is the first message > after a disconnection. > if (isDisconnected ==null || isDisconnected == Boolean.TRUE) { > // Emit a message indicating END of the disconnected state. > IUSessionMessage message =new IUSessionMessage( > new IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"), > new IUSessionInfo(heartbeat.getTimestamp(), IUStatus.ENDED, IUEventType.NO_VALUE)); > out.collect(message); > LOGGER.info(message.getSessionInfo().toString()); > } > > // Update the state store. > isDisconnectedStateStore.update(Boolean.FALSE); > } > > > This produces the expected output. > Also, I will assume that this is the best way to solve my problem - I > can't use Flink's session windows. Let me know if anyone has any other > ideas though! > > Thank you for your time and quick response! > > > On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Manas, > > at the first glance your code looks correct to me. I would investigate > if your keys and watermarks are correct. Esp. the watermark frequency > could be an issue. If watermarks are generated at the same time as the > heartbeats itself, it might be the case that the timers fire first > before the process() function is called which resets the timer. > > Maybe you can give us more information how watermarks are generated? > > Regards, > Timo > > On 11.08.20 08:33, Manas Kale wrote: > > Hi, > > I have a bunch of devices that keep sending heartbeat messages. I > want > > to make an operator that emits messages when a device disconnects > and > > when a device stops being disconnected. > > A device is considered disconnected if we don't receive any > heartbeat > > for more than some TIMEOUT duration. > > This seemed like a good candidate for session windows, but I am > not sure > > how I can express the inverse logic (i.e. detecting periods of > > inactivity instead of activity) using Flink's operators. > > I want to use event time for all processing and ideally want to > achieve > > this behaviour using a single operator. > > > > So I am trying to implement a custom processfunction that, on every > > heartbeat: > > > > * Deletes any previous event time timer > > * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT > > > > The basic idea is that every new heartbeat will keep pushing the > timer > > forward. Only when heartbeats stop arriving does the timer fire, > > indicating the start of a disconnected state. > > Code: > > > > public class IUDisconnectedStateDetectorextends > KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> { > > > > // Tracks if this monitor is disconnected or not. > > private ValueState<Boolean>isDisconnectedStateStore; > > // Tracks which timer was registered. > > private ValueState<Long>registeredTimerStateStore; > > > > private final LoggerLOGGER = > LoggerFactory.getLogger(IUDisconnectedStateDetector.class); > > > > // Called by the Flink runtime before starting this > operator. We > > initialize the state stores here. > > @Override > > public void open(Configuration parameters)throws Exception { > > isDisconnectedStateStore = > getRuntimeContext().getState(new ValueStateDescriptor<Boolean>( > > DISCONNECTED_STATE_STORE_NAME, Boolean.class)); > > registeredTimerStateStore = > getRuntimeContext().getState(new ValueStateDescriptor<Long>( > > REGISTERED_TIMER_STATE_STORE_NAME, Long.class)); > > } > > > > @Override > > public void processElement(IUHeartbeat heartbeat, Context ctx, > Collector<IUSessionMessage> out)throws Exception { > > Boolean isDisconnected =isDisconnectedStateStore.value(); > > LOGGER.info("Watermark: " + heartbeat +", isDisconnected > : " + isDisconnected > > +" last registered timer :" > +registeredTimerStateStore.value()); > > > > > > // If this is the first message for this monitor or is > the first message > > after a disconnection. > > if (isDisconnected ==null || isDisconnected == Boolean.TRUE) { > > // Delete previous timer. > > if (registeredTimerStateStore.value() !=null) > > > ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value()); > > > > // Register a timer that will fire in the future if > no further events > > are received. > > long timerFiringTimestamp = heartbeat.getTimestamp() > +DISCONNECTED_TIMEOUT; > > > ctx.timerService().registerEventTimeTimer(timerFiringTimestamp); > > registeredTimerStateStore.update(timerFiringTimestamp); > > > > // Emit a message indicating END of the disconnected > state. > > IUSessionMessage message =new IUSessionMessage( > > new > IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"), > > new IUSessionInfo(heartbeat.getTimestamp(), > IUStatus.ENDED, IUEventType.NO_VALUE)); > > out.collect(message); > > LOGGER.info(message.getSessionInfo().toString()); > > // Update the state store. > > isDisconnectedStateStore.update(Boolean.FALSE); > > } > > } > > > > > > @Override > > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<IUSessionMessage> out)throws Exception { > > if (isDisconnectedStateStore.value() == Boolean.FALSE) { > > // If this timer fires that means no message was > received from the > > monitor for some timeout duration. > > // Update the state store. > > isDisconnectedStateStore.update(Boolean.TRUE); > > > > // Emit a message indicating START of the > disconnected state. Note that > > since this is applicable for a monitor, > > IUSessionMessage message =new IUSessionMessage( > > new > IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"), > > new IUSessionInfo(timestamp, > IUStatus.STARTED, IUEventType.NO_VALUE)); > > out.collect(message); > > > > LOGGER.info(message.getSessionInfo().toString()); > > } > > } > > } > > > > *However, the above code does not behave as expected - the timer > fires > > even when (a) it has received heartbeats within the timeout and > (b) I > > have the code to delete it*. So, my questions: > > > > * Am I deleting the timer incorrectly? I use a state store to keep > > track of registered timer's timestamps and use that value > when deleting. > > * Am I overcomplicating things? Can this be achieved using Flink's > > inbuild session windowing operators? > > > > Thanks! > |
Free forum by Nabble | Edit this page |