Queries regarding FlinkCEP

Posted by Biplob Biswas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Queries-regarding-FlinkCEP-tp13454.html

Hi ,

Thanks a lot for the help last time, I have a few more questions and I chose to create a new topic as the problem in the previous topic was solved, thanks to useful inputs from Flink Community. The questions are as follows

1. What time does the "within" operator works on "Event Time" or "Processing Time"?, I am asking this as I wanted to know whether something like the following would be captured or not.

MaxOutofOrderness is set to 10 mins, and "within" operator is specified for 5 mins. So if a first events event time is at 1:00  and the corresponding next event is has an event time of 1:04 but it arrives in the system at 1:06. Would this still be processed and alert would be generated or not?

2. What would happen if I don't have a key to specify, the way 2 events are correlated is by using the ctx of the first event and matching some different id. So, we can't group by some unique field. I tried a test run without specifying a key and it apparently works. But how is the shuffling done then in this case?

3. This is one of the major issue, So I could use Event Time with ascending event time extractor for one of my kafka topic because its behavior is consistent.  But when i added another topic to read from where the events are not in ascending order, using ascending timestampextractor gave me timestamp monotonicity violation. Then when I am using BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any warnings anymore but I am no more getting my alerts.

If I go back to using processing time, then I am again getting alerts properly. What could be the problem here?

This is the code I am using:

public class CEPForBAM {


  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    System.out.println(env.getStreamTimeCharacteristic());
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(10000);

// configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
            Arrays.asList("topic1", "topic_x", "topic_test"),
            new StringSerializerToEvent(),
            props);

    kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      @Override
      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });

    DataStream<BAMEvent> events = env.addSource(kafkaSource);

    // Input stream of monitoring events


/*    DataStream<BAMEvent> partitionedInput = events
            .keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/

     evetns.print();
    //partitionedInput.print();

    Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first")
            .where(new SimpleCondition<BAMEvent>() {
              private static final long serialVersionUID = 1390448281048961616L;

              @Override
              public boolean filter(BAMEvent event) throws Exception {
                return event.getEventName().equals(ReadEventType.class.getSimpleName());
              }
            })
            .followedBy("second")
            .where(new IterativeCondition<BAMEvent>() {
              private static final long serialVersionUID = -9216505110246259082L;

              @Override
              public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {

                if (secondEvent.getEventName().equals(StatusChangedEventType.class.getSimpleName())) {
                  for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
                    if (secondEvent.getCorrelationID().contains(firstEvent.getEventId()))
                      return true;
                  }
                }
                return false;
              }
            })
            .within(Time.minutes(10));

    PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern);


    DataStream<Either<String, String>> alerts = patternStream.select(new PatternTimeoutFunction<BAMEvent, String>() {
      private static final long serialVersionUID = -8717561187522704500L;

      @Override
      public String timeout(Map<String, List<BAMEvent>> map, long l) throws Exception {
        return "TimedOut: " + map.toString() + " @ " + l;
      }

    }, new PatternSelectFunction<BAMEvent, String>() {
      private static final long serialVersionUID = 3144439966791408980L;

      @Override
      public String select(Map<String, List<BAMEvent>> pattern) throws Exception {
        BAMEvent bamEvent = pattern.get("first").get(0);
        return "Matched Events: " + bamEvent.getEventId() + "_" + bamEvent.getEventName();
      }
    });

    alerts.print();

    env.execute("CEP monitoring job");
  }
}



Even when I am using Event Time, I am getting events from kafka as can be shown from event.print()