FlinkCEP latency/throughput

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkCEP latency/throughput

Sonex
Hello everyone,

I am testing some patterns with FlinkCEP and I want to measure latency and throughput when using 1 or more processing cores. How can I do that ??

What I have done so far:
Latency: Each time an event arrives I store the system time (System.currentTimeMillis). When flink calls the select function which means we have a full pattern match, again I take the system time. The difference of the system time taken from the first event of the complex event and the system time taken when the function is called is the latency for now.

Throughput: I divide the total number of the events of the dataset by the time taken to complete the experiment.
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP latency/throughput

Kostas Kloudas
Hello Alfred,

As a first general remark, Flink was not optimized for multicore deployments
but rather for distributed environments. This implies overheads (serialization,
communication etc), when compared to libs optimized for multicores. So there
may be libraries that are better optimized for those settings if you are planning
to use just a multicore machine.

Now for your suggestion:

> On May 16, 2017, at 6:03 PM, Sonex <[hidden email]> wrote:
>
> Hello everyone,
>
> I am testing some patterns with FlinkCEP and I want to measure latency and
> throughput when using 1 or more processing cores. How can I do that ??
>
> What I have done so far:
> Latency: Each time an event arrives I store the system time
> (System.currentTimeMillis). When flink calls the select function which means
> we have a full pattern match, again I take the system time. The difference
> of the system time taken from the first event of the complex event and the
> system time taken when the function is called is the latency for now.
>

1) If you are using event time, then you are also accounting for internal buffering and
ordering of the incoming events.
 
2) I am not sure if measuring the time between the arrival of each element, and when
its matching pattern is emitted makes much sense. In a long pattern, the first element
in the matching pattern will wait inevitably longer than the last one, right?

> Throughput: I divide the total number of the events of the dataset by the
> time taken to complete the experiment.
>
>

For throughput you could create a job with a sink that does nothing and only a CEP pattern
in your job and count the elements read by your source/min. If your source is not the bottleneck
then the CEP part of the pipeline is the dominating factor (given that your sink just discards everything
so it cannot create backpressure).

I hope this helps,
Kostas
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP latency/throughput

Dean Wampler


On Wed, May 17, 2017 at 10:34 AM, Kostas Kloudas <[hidden email]> wrote:
Hello Alfred,

As a first general remark, Flink was not optimized for multicore deployments
but rather for distributed environments. This implies overheads (serialization,
communication etc), when compared to libs optimized for multicores. So there
may be libraries that are better optimized for those settings if you are planning
to use just a multicore machine.

Now for your suggestion:
... 

If you're interested in a multi-core option, check out Akka Streams or perhaps the underlying Actor Model



--
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaConsumer using Kafka-GroupID?

Valentin
Hi there,

As far as I understood, Flink Kafka Connectors don’t use the consumer group management feature from Kafka. Here the post I got the info from:

For some reasons we cannot set up a flink-cluster environment, but we still need to assure high availability. e.g. in case one node goes down the second should still keep on running.


My question:
- Is there any chance to run 2 different flink (standalone) apps consuming messages from a single kafka-topic only once? This is what I could do by using 2 native Kafka-Consumers within the same consumer-group.

Many thanks in advance
Valentin 
 
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer using Kafka-GroupID?

Tzu-Li (Gordon) Tai
Hi Valentin!

Your understanding is correct, the Kafka connectors do not use the consumer group functionality to distribute messages across multiple instances of a FlinkKafkaConsumer source. It’s basically determining which instances should be assigned which Kafka partitions based on a simple round-robin distribution.

Is there any chance to run 2 different flink (standalone) apps consuming messages from a single kafka-topic only once? This is what I could do by using 2 native Kafka-Consumers within the same consumer-group.

Therefore, I don’t think this is possible with the FlinkKafkaConsumers. However, this is exactly what Flink’s checkpointing and savepoints is designed for.
If your single app fails, using checkpoints / savepoints the consumer can just re-start from the offsets in that checkpoint / savepoint.
In other words, with Flink’s streaming fault tolerance mechanics, you will get exactly-once guarantees across 2 different runs of the app.
The FlinkKafkaConnector docs should explain this thoroughly [1].

Does this address what your concerns?

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance


On 18 May 2017 at 1:35:35 AM, Valentin ([hidden email]) wrote:

Hi there,

As far as I understood, Flink Kafka Connectors don’t use the consumer group management feature from Kafka. Here the post I got the info from:

For some reasons we cannot set up a flink-cluster environment, but we still need to assure high availability. e.g. in case one node goes down the second should still keep on running.


My question:
- Is there any chance to run 2 different flink (standalone) apps consuming messages from a single kafka-topic only once? This is what I could do by using 2 native Kafka-Consumers within the same consumer-group.

Many thanks in advance
Valentin 
 
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP latency/throughput

Sonex
In reply to this post by Kostas Kloudas
Hello Kostas,

thanks for your response. Regarding throughput, it makes sense.

But there is still one question remaining. How can I measure the latency of my FlinkCEP application ???

Maybe you answered it, but I didn`t quite get that. As far as your number 2 question about measuring latency, the answer is yes, the first element in the matching pattern will wait inevitably longer than the last one

Thank you for your time!!!
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP latency/throughput

Dawid Wysakowicz
Hello Alfred,

Just some considerations  from my side as for the latency. I think the first step should be defining what does "latency" for a CEP library really means.
The first thing that comes to my mind is the time period between the arrival of an event that should trigger a match (ending pattern) and actual time when the match is emitted(for that case a select function is a good place I think).

I think Kostas was also referring to similar kind of issue.

Hope it will be helpful.

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-05-19 10:59 GMT+02:00 Sonex <[hidden email]>:
Hello Kostas,

thanks for your response. Regarding throughput, it makes sense.

But there is still one question remaining. How can I measure the latency of
my FlinkCEP application ???

Maybe you answered it, but I didn`t quite get that. As far as your number 2
question about measuring latency, the answer is yes, the first element in
the matching pattern will wait inevitably longer than the last one

Thank you for your time!!!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-tp13170p13221.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.