Hi,
I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core Xeon CPU for each node) essentially reading messages from kafka and applying a simple CEP rule on those messages. The program is expected to have a parallelism of 1 for input as my test kafka topic has only 1 partition.
The program looks pretty much like this:
FlinkKafkaConsumer<Databean> flinkKafkaConsumer = new FlinkKafkaConsumer<>(sourceTopicName, deserializer, properties);
flinkKafkaConsumer.setStartFromGroupOffsets();
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
flinkKafkaConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Databean>(Time.minutes(30)) {
@Override
public long extractTimestamp(Databean element) {
return element.getTs();
}
});
SingleOutputStreamOperator<Databean> kafkaSource = env.addSource(flinkKafkaConsumer)
// Keying here is very slow???
KeyedStream<Databean, String> keyedStream = proxylogsStream.keyBy(bean -> bean.getUser());
Pattern<Databean, Databean> cepPattern = Pattern.<Databean>
begin("firstStep", AfterMatchSkipStrategy.skipPastLastEvent()).where(new FirstFilter())
.followedBy("secondStep").where(new SecondFilter())
.within(Time.minutes(15));
PatternStream<Databean> patternMatchStream = CEP.pattern(keyedStream, cepPattern);
SingleOutputStreamOperator<Alarmbean> beanAlerts = patternMatchStream.select(new MatchToAlarmConverter());
beanAlerts.addSink(new FlinkKafkaProducer<>(config.kafkaAlertsTopic, new AlarmBeanSerializeSchema(), properties));
The applied CEP filter “FirstFilter” and “SecondFilter” are very simple rules like
return “humidity”.equals(bean.getRecordedMetricType())
My Databean has round about 50 elements containing numbers and small strings (Up to 50 characters).
Currently, I write 200.000 Elements into my Kafka topic every 5 minutes. 100.000 of those have the same username, i.e. all have the name “empty”, and the other half are almost unique. (some random number between 1 and 100000000). The generated data timestamp randomly varies +-7.5 minutes between the generated timestamp (Generation time = time pushed into kafka).
My CEP rule is written with conditions that never match, so the kafka sink as well as the stream select function can be eliminated as causes for the slow processing speeds.
I start the application via yarn with:
"${FLINK_HOME}/bin/yarn-session.sh" -n 4 -jm 4096m -tm 65536m --name "TESTJOB" -d
${FLINK_HOME}/bin/flink run -m ${FLINK_HOST} -d -n "${JOB_JAR}" $*
So the job has plenty of RAM available, but I didn’t note any difference in terms of speed when assigning 16G or 64G of RAM. As expected, I have a single task manager and parallelism 1.
Now about my problem:
Currently, the pipeline processes round about 150-300 elements per second. On startup, it peaks to 3000-4000 elements per second but slows down within one minute to 150-300 elements per second.
I immediately expected CEP to be that slow (As this is my first CEP experiment), but I observed the following:
This lead me to suspect that I don’t have a CEP problem but really have a problem with “keyBy” on localhost as the second task seem to immediately consume any messages in input queue it receives. My questions:
Best regards
Theo Diefenthal
Free forum by Nabble | Edit this page |