Hi ,
Thanks a lot for the help last time, I have a few more questions and I chose
to create a new topic as the problem in the previous topic was solved,
thanks to useful inputs from Flink Community. The questions are as follows
*1.* What time does the "within" operator works on "Event Time" or
"Processing Time", I am asking this as I wanted to know whether something
like the following would be captured or not.
MaxOutofOrderness is set to 10 mins, and "within" operator is specified for
5 mins. So if a first events event time is at 1:00 and the corresponding
next event is has an event time of 1:04 but it arrives in the system at
1:06. Would this still be processed and alert would be generated or not?
*2.* What would happen if I don't have a key to specify, the way 2 events
are correlated is by using the ctx of the first event and matching some
different id. So, we can't group by some unique field. I tried a test run
without specifying a key and it apparently works. But how is the shuffling
done then in this case?
*3.* This is one of the major issue, So I could use Event Time with
ascending event time extractor for one of my kafka topic because its
behavior is consistent. But when i added another topic to read from where
the events are not in ascending order, using ascending timestampextractor
gave me timestamp monotonicity violation. Then when I am using
BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any
warnings anymore but I am no more getting my alerts.
If I go back to using processing time, then I am again getting alerts
properly. What could be the problem here?
*This is the code I am using:*
/public class CEPForBAM {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println(env.getStreamTimeCharacteristic()) ;
env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval( 10000);
// configure Kafka consumer
Properties props = new Properties();
props = getDefaultProperties(props);
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new
FlinkKafkaConsumer010<>(
Arrays.asList("topic1", "topic_x", "topic_test"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks( new
BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time. seconds(60)) {
private static final long serialVersionUID = -7228487240278428374L;
@Override
public long extractTimestamp(BAMEvent event) {
return event.getTimestamp();
}
});
DataStream<BAMEvent> events = env.addSource(kafkaSource);
// Input stream of monitoring events
/* DataStream<BAMEvent> partitionedInput = events
.keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/
evetns.print();
//partitionedInput.print();
Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID =
1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return
event.getEventName().equals(ReadEventType.class. getSimpleName());
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID =
-9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent>
ctx) throws Exception {
if
(secondEvent.getEventName().equals(StatusChangedEventType. class.getSimpleName()))
{
for (BAMEvent firstEvent :
ctx.getEventsForPattern("first")) {
if
(secondEvent.getCorrelationID().contains(firstEvent. getEventId()))
return true;
}
}
return false;
}
})
.within(Time.minutes(10));
PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern);
DataStream<Either<String, String>> alerts = patternStream.select(new
PatternTimeoutFunction<BAMEvent, String>() {
private static final long serialVersionUID = -8717561187522704500L;
@Override
public String timeout(Map<String, List<BAMEvent>> map, long l)
throws Exception {
return "TimedOut: " + map.toString() + " @ " + l;
}
}, new PatternSelectFunction<BAMEvent, String>() {
private static final long serialVersionUID = 3144439966791408980L;
@Override
public String select(Map<String, List<BAMEvent>> pattern) throws
Exception {
BAMEvent bamEvent = pattern.get("first").get(0);
return "Matched Events: " + bamEvent.getEventId() + "_" +
bamEvent.getEventName();
}
});
alerts.print();
env.execute("CEP monitoring job");
}
}/
Even when I am using Event Time, I am getting events from kafka as can be
shown from event.print()
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050. n4.nabble.com/Queries- regarding-FlinkCEP-tp13454. html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Free forum by Nabble | Edit this page |