Hi everyone, i am working on a use case with CEP and Flink:
Allthough i cannot warrant the ascending order of events (LogEntry) when consuming from kafka, i decided to try this implementation: //My events provide its own timestamps env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //"Watermarks are generated inside the Kafka consumer, per Kafka partition": val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new FlinkKafkaConsumer08[LogEntry]( parameterTool.getRequired("topic"), new LogEntrySchema(parameterTool.getBoolean("parseBody", true)), parameterTool.getProperties) //may not be ascending order val kafkaSourceAssignedTimesTamp = kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[LogEntry] { override def extractAscendingTimestamp(t: LogEntry): Long = { ProcessorHelper.toTimestamp(t.timestamp).getTime } }) val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp) I implemented a pattern like: myPattern = Pattern .begin[LogEntry]("First Event") .subtype(classOf[LogEntry]) .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR")) .next("Second Event") .subtype(classOf[LogEntry]) .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR")) .within(Time.minutes(10)) } This pattern will trigger alert when two consecutives LogEntry with severity ERROR and with same service (it will be generate alerts for each service individually) CEP.pattern(stream .keyBy(_.service), myPattern) An alert is made of two logEntry: ErrorAlert: service_name-ERROR-timestamp first event service_name-ERROR-timestamp second event I am getting alerts like this: ErrorAlert: service_2-3-2017-04-19 06:57:49 service_2-3-2017-04-19 07:02:23 ErrorAlert: service_2-3-2017-04-19 07:32:37 service_2-3-2017-04-19 07:34:06 ErrorAlert: service_1-3-2017-04-19 07:25:04 service_1-3-2017-04-19 07:29:39 ErrorAlert: service_1-3-2017-04-19 07:29:39 service_1-3-2017-04-19 07:30:37 ErrorAlert: service_3-3-2017-04-19 07:49:27 service_3-3-2017-04-19 06:59:10 ---> ups! ErrorAlert: service_2-3-2017-04-19 07:50:06 service_2-3-2017-04-19 06:54:48 ---> ups! ErrorAlert: service_2-3-2017-04-19 06:54:48 service_2-3-2017-04-19 06:55:03 ErrorAlert: service_3-3-2017-04-19 07:21:11 service_3-3-2017-04-19 07:24:52 ErrorAlert: service_1-3-2017-04-19 07:30:05 service_1-3-2017-04-19 07:31:33 ErrorAlert: service_3-3-2017-04-19 07:08:31 service_3-3-2017-04-19 07:17:42 ErrorAlert: service_1-3-2017-04-19 07:02:30 service_1-3-2017-04-19 07:06:58 ErrorAlert: service_3-3-2017-04-19 07:03:50 service_3-3-2017-04-19 07:11:48 ErrorAlert: service_3-3-2017-04-19 07:19:04 service_3-3-2017-04-19 07:21:25 ErrorAlert: service_3-3-2017-04-19 07:33:13 service_3-3-2017-04-19 07:38:47 I also tried this approach: kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) { override def extractTimestamp(t: LogEntry): Long = { ProcessorHelper.toTimestamp(t.timestamp).getTime } }) Time.seconds(0) —> if i set like this, do i prevent the events from being delivered with delayed ? But i get the same problem as decribed above: …… ErrorAlert: service_3-3-2017-04-19 07:49:27 service_3-3-2017-04-19 06:59:10 ---> ups! ErrorAlert: service_2-3-2017-04-19 07:50:06 service_2-3-2017-04-19 06:54:48 ---> ups! …... Initially i thought my pattern was not correctly implemented but the problem seems to be i am unable to assign timestamp and consequently emit watermark properly when events are unordered. Any sugestion is well apreciated, thanks in advance. Best regards, Luis |
+Kostas and +Dawid
Could you please have a look? You two have worked in these parts most recently. I recall that there were some problems when it comes to event time and out-of-order processing in CEP in Flink 1.2 Best, Aljoscha
|
In reply to this post by Luis Lázaro
Hi Luis and Aljoscha,
In Flink-1.2 late events were not dropped, but they were processed as normal ones.
This is fixed for Flink-1.3 with https://issues.apache.org/jira/browse/FLINK-6205. I would recommend you to switch to the master branch (this will be the upcoming Flink-1.3 release) and try it out to see if everything works as expected. The CEP in Flink-1.3 will come with richer patterns and a lot of bug-fixes and by trying it out you will also help us stabilize it even further before its official release. Thanks a lot, Kostas
|
Hi Aljoscha and Kostas, thanks in advance. Kostas, i followed your recommendation and it seems to be working fine. I did: - upgrade to 1.3.-SNAPSHOT from master branch. - try assign timestamp and emit watermarks using AscendingTimestampExtractor: alerts are correct (do not process late events as normal ones) and i get a lot of warning about violated ascending monotony (its ok, my events are not ordered in time). - try assign timestamp and emit watermarks using BoundedOutOfOrdernessTimestampExtractor: alerts are correct. Thanks a lot, best regards, Luis. |
Perfect!
Thanks a lot for testing it Luis! And keep us posted if you find anything else. As you may have seen the CEP library is undergoing heavy refactoring for the upcoming release. Kostas
|
Free forum by Nabble | Edit this page |