Apache Flink CEP how to detect if event did not occur within x seconds?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink CEP how to detect if event did not occur within x seconds?

Spico Florin
Hello!
I'm using the same questions as in this stackoverflow post https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, due to fact that I need the same functionality.

My use case is to detect when an device is disconnected from a server (for example).
From SNMP I'm sending the data to Kafka then  to Flink CEP and check whether I have the established connection within X seconds.
If the window of X seconds timedout, then I can consider that the device is disconnected.
The issue is that, when the device is disconnected, SNMP does't send anymore data about that port that was used. Thus no event is sent to CEP. Only when the device is connected again then, the disconnected event is triggered.


I checked the status of the jira ticket regarding this feature:


but unfortunately, is still open.

In the official documentation, https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, in the paragraph related with Idling sources, somehow the situation described resembles what we need:
"Currently, with pure event time watermarks generators, watermarks can not progress if there are no elements to be processed. That means in case of gap in the 
incoming data, event time will not progress and for example the window operator will not be triggered and thus existing windows will not be able to produce any output data.
To circumvent this one can use periodic watermark assigners that don’t only assign based on element timestamps. An example solution could be 
an assigner that switches to using current processing time as the time basis after not observing new events for a while."

Unfortunately, no example of how the mentioned assigner looks like. A full example will help us to check whether the mentioned solution works or not.

Therefore, can you please share your thoughts about this feature? 
Is feasible? If yes,please provide the Flink version that solve it and the example.
Is still a limitation and there is no plan to solve it in near future?

I look forward for your answers.

Best regards,
Florin


Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink CEP how to detect if event did not occur within x seconds?

Andrey Zagrebin
Hi Florin,

I think Dawid might help you. I am pulling him into the discussion.

Best,
Andrey

On 12 Dec 2018, at 16:24, Spico Florin <[hidden email]> wrote:

Hello!
I'm using the same questions as in this stackoverflow post https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, due to fact that I need the same functionality.

My use case is to detect when an device is disconnected from a server (for example).
From SNMP I'm sending the data to Kafka then  to Flink CEP and check whether I have the established connection within X seconds.
If the window of X seconds timedout, then I can consider that the device is disconnected.
The issue is that, when the device is disconnected, SNMP does't send anymore data about that port that was used. Thus no event is sent to CEP. Only when the device is connected again then, the disconnected event is triggered.


I checked the status of the jira ticket regarding this feature:


but unfortunately, is still open.

In the official documentation, https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, in the paragraph related with Idling sources, somehow the situation described resembles what we need:
"Currently, with pure event time watermarks generators, watermarks can not progress if there are no elements to be processed. That means in case of gap in the 
incoming data, event time will not progress and for example the window operator will not be triggered and thus existing windows will not be able to produce any output data.
To circumvent this one can use periodic watermark assigners that don’t only assign based on element timestamps. An example solution could be 
an assigner that switches to using current processing time as the time basis after not observing new events for a while."

Unfortunately, no example of how the mentioned assigner looks like. A full example will help us to check whether the mentioned solution works or not.

Therefore, can you please share your thoughts about this feature? 
Is feasible? If yes,please provide the Flink version that solve it and the example.
Is still a limitation and there is no plan to solve it in near future?

I look forward for your answers.

Best regards,
Florin



Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink CEP how to detect if event did not occur within x seconds?

fudian.fd
Hi Florin,

Are you using processing time or event time? The JIRA FLINK-7384 allows to emit timed-out patterns without having to wait for the next element ONLY in processing time. For event time, it still needs the watermark to trigger the emitting of matched or timed-out patterns. Besides, the watermark is global for the whole job, not per key. In your application, SNMP will not send anymore data for one device when it's disconnected. But if there are many devices to be monitored, the watermark will still progress as long as there are some devices connected. If it's the case that there are no elements at all, the mentioned assigner will look like the following:

class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> {

private volatile long maxTimestamp = Long.MIN_VALUE;

private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes
private static final long OFFSET = 30 * 1000; // 30 seconds
private volatile long lastTimeReceivedElement = 0L;

@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element);
lastTimeReceivedElement = System.currentTimeMillis();
return element;
}

@Nullable
@Override
public Watermark getCurrentWatermark() {
if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) {
maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - OFFSET);
}
return new Watermark(maxTimestamp);
}
}

Regards,
Dian

在 2018年12月13日,上午12:06,Andrey Zagrebin <[hidden email]> 写道:

Hi Florin,

I think Dawid might help you. I am pulling him into the discussion.

Best,
Andrey

On 12 Dec 2018, at 16:24, Spico Florin <[hidden email]> wrote:

Hello!
I'm using the same questions as in this stackoverflow post https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, due to fact that I need the same functionality.

My use case is to detect when an device is disconnected from a server (for example).
From SNMP I'm sending the data to Kafka then  to Flink CEP and check whether I have the established connection within X seconds.
If the window of X seconds timedout, then I can consider that the device is disconnected.
The issue is that, when the device is disconnected, SNMP does't send anymore data about that port that was used. Thus no event is sent to CEP. Only when the device is connected again then, the disconnected event is triggered.


I checked the status of the jira ticket regarding this feature:


but unfortunately, is still open.

In the official documentation, https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, in the paragraph related with Idling sources, somehow the situation described resembles what we need:
"Currently, with pure event time watermarks generators, watermarks can not progress if there are no elements to be processed. That means in case of gap in the 
incoming data, event time will not progress and for example the window operator will not be triggered and thus existing windows will not be able to produce any output data.
To circumvent this one can use periodic watermark assigners that don’t only assign based on element timestamps. An example solution could be 
an assigner that switches to using current processing time as the time basis after not observing new events for a while."

Unfortunately, no example of how the mentioned assigner looks like. A full example will help us to check whether the mentioned solution works or not.

Therefore, can you please share your thoughts about this feature? 
Is feasible? If yes,please provide the Flink version that solve it and the example.
Is still a limitation and there is no plan to solve it in near future?

I look forward for your answers.

Best regards,
Florin





smime.p7s (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink CEP how to detect if event did not occur within x seconds?

Spico Florin
Hello, Dian!

Thank you very much for your explanations. In my case, CEP  patterns are based on the event time. Also, as you said I have many devices and also many ports on that devices. Therefore, I'm using keyed streams. 
So I would like to know which device was disconnected on which port.
Is that feasible Flink? If yes in what version and how?

Regards,
 Florin


On Thu, Dec 13, 2018 at 4:37 AM fudian.fd <[hidden email]> wrote:
Hi Florin,

Are you using processing time or event time? The JIRA FLINK-7384 allows to emit timed-out patterns without having to wait for the next element ONLY in processing time. For event time, it still needs the watermark to trigger the emitting of matched or timed-out patterns. Besides, the watermark is global for the whole job, not per key. In your application, SNMP will not send anymore data for one device when it's disconnected. But if there are many devices to be monitored, the watermark will still progress as long as there are some devices connected. If it's the case that there are no elements at all, the mentioned assigner will look like the following:

class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> {

private volatile long maxTimestamp = Long.MIN_VALUE;

private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes
private static final long OFFSET = 30 * 1000; // 30 seconds
private volatile long lastTimeReceivedElement = 0L;

@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element);
lastTimeReceivedElement = System.currentTimeMillis();
return element;
}

@Nullable
@Override
public Watermark getCurrentWatermark() {
if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) {
maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - OFFSET);
}
return new Watermark(maxTimestamp);
}
}

Regards,
Dian

在 2018年12月13日,上午12:06,Andrey Zagrebin <[hidden email]> 写道:

Hi Florin,

I think Dawid might help you. I am pulling him into the discussion.

Best,
Andrey

On 12 Dec 2018, at 16:24, Spico Florin <[hidden email]> wrote:

Hello!
I'm using the same questions as in this stackoverflow post https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, due to fact that I need the same functionality.

My use case is to detect when an device is disconnected from a server (for example).
From SNMP I'm sending the data to Kafka then  to Flink CEP and check whether I have the established connection within X seconds.
If the window of X seconds timedout, then I can consider that the device is disconnected.
The issue is that, when the device is disconnected, SNMP does't send anymore data about that port that was used. Thus no event is sent to CEP. Only when the device is connected again then, the disconnected event is triggered.


I checked the status of the jira ticket regarding this feature:


but unfortunately, is still open.

In the official documentation, https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, in the paragraph related with Idling sources, somehow the situation described resembles what we need:
"Currently, with pure event time watermarks generators, watermarks can not progress if there are no elements to be processed. That means in case of gap in the 
incoming data, event time will not progress and for example the window operator will not be triggered and thus existing windows will not be able to produce any output data.
To circumvent this one can use periodic watermark assigners that don’t only assign based on element timestamps. An example solution could be 
an assigner that switches to using current processing time as the time basis after not observing new events for a while."

Unfortunately, no example of how the mentioned assigner looks like. A full example will help us to check whether the mentioned solution works or not.

Therefore, can you please share your thoughts about this feature? 
Is feasible? If yes,please provide the Flink version that solve it and the example.
Is still a limitation and there is no plan to solve it in near future?

I look forward for your answers.

Best regards,
Florin




Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink CEP how to detect if event did not occur within x seconds?

Dawid Wysakowicz-2
In reply to this post by fudian.fd

Hi Florin,

I concur with Dian. If you have any other questions, please do not hesitate to ask.

Best,

Dawid

On 13/12/2018 03:37, fudian.fd wrote:
Hi Florin,

Are you using processing time or event time? The JIRA FLINK-7384 allows to emit timed-out patterns without having to wait for the next element ONLY in processing time. For event time, it still needs the watermark to trigger the emitting of matched or timed-out patterns. Besides, the watermark is global for the whole job, not per key. In your application, SNMP will not send anymore data for one device when it's disconnected. But if there are many devices to be monitored, the watermark will still progress as long as there are some devices connected. If it's the case that there are no elements at all, the mentioned assigner will look like the following:

class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> {

   private volatile long maxTimestamp = Long.MIN_VALUE;

   private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes
   private static final long OFFSET = 30 * 1000;    // 30 seconds
   private volatile long lastTimeReceivedElement = 0L;

   @Override
   public long extractTimestamp(Long element, long previousElementTimestamp) {
      maxTimestamp = Math.max(maxTimestamp, element);
      lastTimeReceivedElement = System.currentTimeMillis();
      return element;
   }

   @Nullable
   @Override
   public Watermark getCurrentWatermark() {
      if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) {
         maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - OFFSET);
      }
      return new Watermark(maxTimestamp);
   }
}

Regards,
Dian

在 2018年12月13日,上午12:06,Andrey Zagrebin <[hidden email]> 写道:

Hi Florin,

I think Dawid might help you. I am pulling him into the discussion.

Best,
Andrey

On 12 Dec 2018, at 16:24, Spico Florin <[hidden email]> wrote:

Hello!
I'm using the same questions as in this stackoverflow post https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, due to fact that I need the same functionality.

My use case is to detect when an device is disconnected from a server (for example).
From SNMP I'm sending the data to Kafka then  to Flink CEP and check whether I have the established connection within X seconds.
If the window of X seconds timedout, then I can consider that the device is disconnected.
The issue is that, when the device is disconnected, SNMP does't send anymore data about that port that was used. Thus no event is sent to CEP. Only when the device is connected again then, the disconnected event is triggered.


I checked the status of the jira ticket regarding this feature:


but unfortunately, is still open.

In the official documentation, https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, in the paragraph related with Idling sources, somehow the situation described resembles what we need:
"Currently, with pure event time watermarks generators, watermarks can not progress if there are no elements to be processed. That means in case of gap in the 
incoming data, event time will not progress and for example the window operator will not be triggered and thus existing windows will not be able to produce any output data.
To circumvent this one can use periodic watermark assigners that don’t only assign based on element timestamps. An example solution could be 
an assigner that switches to using current processing time as the time basis after not observing new events for a while."

Unfortunately, no example of how the mentioned assigner looks like. A full example will help us to check whether the mentioned solution works or not.

Therefore, can you please share your thoughts about this feature? 
Is feasible? If yes,please provide the Flink version that solve it and the example.
Is still a limitation and there is no plan to solve it in near future?

I look forward for your answers.

Best regards,
Florin





signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink CEP how to detect if event did not occur within x seconds?

Dawid Wysakowicz-2
In reply to this post by Spico Florin

Hello once again,

If you want to use CEP library you can e.g. key by device, then apply pattern:

Pattern.<Event>begin("connect").where(...)
.followedBy("established").where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(
        Event value, Context<Event> ctx) throws Exception {
        return ctx.getEventsForPattern("connect").iterator().next().getPort().equals(value.getPort());
    }
})

.within(Time.milliseconds(X));

Then you can select the timed out partial matches[1] only.

This latest feature that you need in this example is the IterativeCondition which was introduced in flink 1.3.

If you don't like the CEP API you may want to try implement the same logic with ProcessFunction[2] and register timers on the connect event yourself.

Similar solutions you can find in those trainings: http://training.data-artisans.com/exercises/rideCleansing.html and http://training.data-artisans.com/exercises/CEP.html

Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html#handling-timed-out-partial-patterns

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html


On 13/12/2018 09:20, Spico Florin wrote:
Hello, Dian!

Thank you very much for your explanations. In my case, CEP  patterns are based on the event time. Also, as you said I have many devices and also many ports on that devices. Therefore, I'm using keyed streams. 
So I would like to know which device was disconnected on which port.
Is that feasible Flink? If yes in what version and how?

Regards,
 Florin


On Thu, Dec 13, 2018 at 4:37 AM fudian.fd <[hidden email]> wrote:
Hi Florin,

Are you using processing time or event time? The JIRA FLINK-7384 allows to emit timed-out patterns without having to wait for the next element ONLY in processing time. For event time, it still needs the watermark to trigger the emitting of matched or timed-out patterns. Besides, the watermark is global for the whole job, not per key. In your application, SNMP will not send anymore data for one device when it's disconnected. But if there are many devices to be monitored, the watermark will still progress as long as there are some devices connected. If it's the case that there are no elements at all, the mentioned assigner will look like the following:

class PeriodicExtractor implements AssignerWithPeriodicWatermarks<Long> {

   private volatile long maxTimestamp = Long.MIN_VALUE;

   private static final long MAX_IDLE = 5 * 60 * 1000; // 5 minutes
   private static final long OFFSET = 30 * 1000;    // 30 seconds
   private volatile long lastTimeReceivedElement = 0L;

   @Override
   public long extractTimestamp(Long element, long previousElementTimestamp) {
      maxTimestamp = Math.max(maxTimestamp, element);
      lastTimeReceivedElement = System.currentTimeMillis();
      return element;
   }

   @Nullable
   @Override
   public Watermark getCurrentWatermark() {
      if (System.currentTimeMillis() - lastTimeReceivedElement >= MAX_IDLE) {
         maxTimestamp = Math.max(maxTimestamp, System.currentTimeMillis() - OFFSET);
      }
      return new Watermark(maxTimestamp);
   }
}

Regards,
Dian

在 2018年12月13日,上午12:06,Andrey Zagrebin <[hidden email]> 写道:

Hi Florin,

I think Dawid might help you. I am pulling him into the discussion.

Best,
Andrey

On 12 Dec 2018, at 16:24, Spico Florin <[hidden email]> wrote:

Hello!
I'm using the same questions as in this stackoverflow post https://stackoverflow.com/questions/50356981/apache-flink-cep-how-to-detect-if-event-did-not-occur-within-x-seconds?rq=1, due to fact that I need the same functionality.

My use case is to detect when an device is disconnected from a server (for example).
From SNMP I'm sending the data to Kafka then  to Flink CEP and check whether I have the established connection within X seconds.
If the window of X seconds timedout, then I can consider that the device is disconnected.
The issue is that, when the device is disconnected, SNMP does't send anymore data about that port that was used. Thus no event is sent to CEP. Only when the device is connected again then, the disconnected event is triggered.


I checked the status of the jira ticket regarding this feature:


but unfortunately, is still open.

In the official documentation, https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html, in the paragraph related with Idling sources, somehow the situation described resembles what we need:
"Currently, with pure event time watermarks generators, watermarks can not progress if there are no elements to be processed. That means in case of gap in the 
incoming data, event time will not progress and for example the window operator will not be triggered and thus existing windows will not be able to produce any output data.
To circumvent this one can use periodic watermark assigners that don’t only assign based on element timestamps. An example solution could be 
an assigner that switches to using current processing time as the time basis after not observing new events for a while."

Unfortunately, no example of how the mentioned assigner looks like. A full example will help us to check whether the mentioned solution works or not.

Therefore, can you please share your thoughts about this feature? 
Is feasible? If yes,please provide the Flink version that solve it and the example.
Is still a limitation and there is no plan to solve it in near future?

I look forward for your answers.

Best regards,
Florin





signature.asc (849 bytes) Download Attachment