Generate Timestamps and emit Watermarks - unordered events - Kafka source

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Generate Timestamps and emit Watermarks - unordered events - Kafka source

Luis Lázaro

Hi everyone, 
i am working on a use case  with CEP and Flink:

  • Flink 1.2
  • Source is Kafka configured with one single partition.
  • Data are syslog standard messages parsed as LogEntry (object with attributes like timestamp, service, severity, etc)
  • An event is a LogEntry.
  • If two consecutives LogEntry with severity ERROR (3) and same service are matched in 10 minutes period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when consuming from kafka, i decided to try this implementation:


//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new FlinkKafkaConsumer08[LogEntry](
      parameterTool.getRequired("topic"), new LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
      parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[LogEntry] {
      override def extractAscendingTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
      .begin[LogEntry]("First Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .next("Second Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity ERROR and with same service (it will be generate alerts for each service individually)

  CEP.pattern(stream
.keyBy(_.service),
     myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:

kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
      override def extractTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

Time.seconds(0) —> if i set like this, do i prevent the events from being delivered with delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem seems to be i am unable to assign timestamp and consequently emit watermark properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis

Reply | Threaded
Open this post in threaded view
|

Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

Aljoscha Krettek
+Kostas and +Dawid

Could you please have a look? You two have worked in these parts most recently. I recall that there were some problems when it comes to event time and out-of-order processing in CEP in Flink 1.2

Best,
Aljoscha
On 19. Apr 2017, at 15:28, Luis Lázaro <[hidden email]> wrote:


Hi everyone, 
i am working on a use case  with CEP and Flink:

  • Flink 1.2
  • Source is Kafka configured with one single partition.
  • Data are syslog standard messages parsed as LogEntry (object with attributes like timestamp, service, severity, etc)
  • An event is a LogEntry.
  • If two consecutives LogEntry with severity ERROR (3) and same service are matched in 10 minutes period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when consuming from kafka, i decided to try this implementation:


//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new FlinkKafkaConsumer08[LogEntry](
      parameterTool.getRequired("topic"), new LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
      parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[LogEntry] {
      override def extractAscendingTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
      .begin[LogEntry]("First Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .next("Second Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity ERROR and with same service (it will be generate alerts for each service individually)

  CEP.pattern(stream
.keyBy(_.service),
     myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:

kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
      override def extractTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

Time.seconds(0) —> if i set like this, do i prevent the events from being delivered with delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem seems to be i am unable to assign timestamp and consequently emit watermark properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis


Reply | Threaded
Open this post in threaded view
|

Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

Kostas Kloudas
In reply to this post by Luis Lázaro
Hi Luis and Aljoscha,

In Flink-1.2 late events were not dropped, but they were processed as normal ones.
This is fixed for Flink-1.3 with https://issues.apache.org/jira/browse/FLINK-6205.

I would recommend you to switch to the master branch (this will be the upcoming Flink-1.3
release) and try it out to see if everything works as expected.

The CEP in Flink-1.3 will come with richer patterns and a lot of bug-fixes and by 
trying it out you will also help us stabilize it even further before its official release.

Thanks a lot,
Kostas

On Apr 19, 2017, at 3:28 PM, Luis Lázaro <[hidden email]> wrote:


Hi everyone, 
i am working on a use case  with CEP and Flink:

  • Flink 1.2
  • Source is Kafka configured with one single partition.
  • Data are syslog standard messages parsed as LogEntry (object with attributes like timestamp, service, severity, etc)
  • An event is a LogEntry.
  • If two consecutives LogEntry with severity ERROR (3) and same service are matched in 10 minutes period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when consuming from kafka, i decided to try this implementation:


//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new FlinkKafkaConsumer08[LogEntry](
      parameterTool.getRequired("topic"), new LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
      parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[LogEntry] {
      override def extractAscendingTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
      .begin[LogEntry]("First Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .next("Second Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity ERROR and with same service (it will be generate alerts for each service individually)

  CEP.pattern(stream
.keyBy(_.service),
     myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:

kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
      override def extractTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

Time.seconds(0) —> if i set like this, do i prevent the events from being delivered with delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem seems to be i am unable to assign timestamp and consequently emit watermark properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis


Reply | Threaded
Open this post in threaded view
|

Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

Luis Lázaro
Hi Aljoscha and Kostas, thanks in advance.

Kostas, i followed your recommendation and it seems to be working fine.

I did:
- upgrade to 1.3.-SNAPSHOT from master branch.
- try assign timestamp and emit watermarks using AscendingTimestampExtractor: alerts are correct (do not process late events as normal ones) and i get a lot of warning about violated ascending monotony (its ok, my events are not ordered in time).
- try assign timestamp and emit watermarks using BoundedOutOfOrdernessTimestampExtractor: alerts are correct.


Thanks a lot, 
best regards, Luis.



Reply | Threaded
Open this post in threaded view
|

Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

Kostas Kloudas
Perfect! 

Thanks a lot for testing it Luis!
And keep us posted if you find anything else.
As you may have seen the CEP library is undergoing heavy refactoring for the upcoming release.

Kostas

On Apr 25, 2017, at 12:30 PM, Luis Lázaro <[hidden email]> wrote:

Hi Aljoscha and Kostas, thanks in advance.

Kostas, i followed your recommendation and it seems to be working fine.

I did:
- upgrade to 1.3.-SNAPSHOT from master branch.
- try assign timestamp and emit watermarks using AscendingTimestampExtractor: alerts are correct (do not process late events as normal ones) and i get a lot of warning about violated ascending monotony (its ok, my events are not ordered in time).
- try assign timestamp and emit watermarks using BoundedOutOfOrdernessTimestampExtractor: alerts are correct.


Thanks a lot, 
best regards, Luis.