Latency on Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Latency on Flink

gdibernardo

Hello everyone,

I am a completely newcomer of streaming engines and big data platforms but I am considering using Flink for my master thesis and before starting using it I am trying to do some kind of evaluation of the system. In particular I am interested in observing how the system reacts in terms of latency when it receives a big amount of data to process.

I set up a simple application consisting in:
– a Kafka producer that generates data for a Kafka topic; each data message is distinguished by a source id.

– a Flink consumer app that reads from Kafka and it should apply some kind of reduction operator to the received data (e.g. calculate MEAN value of the last 1000 elements received). The Flink consumer keeps the state of the messages coming from a certain source (not sure if this is the more efficient approach though). 

I run this application on AWS using EMR with a relatively simple configuration:
– EMR Cluster: Master m3.xlarge (4 CPU + 15GiB Memory), 2 Core (2 x m3.xlarge )
– Kafka + Zookeeper running in a m4.xlarge (4CPU + 16GiB memory).

I run the expirement with 2 task managers and 4 slots; I also tried to play with the number of partitions of the kafka topic but I experienced really high-latency with the increase of the number of messages generate per seconds by the Kafka producer. With the simple configuration described above I experienced really high latency when for example my consumer application generates 5000 double values per seconds; and more messages are created more the latency increases. 

I would like to ask you if, even for this super simple experiment, should I scale-out my Flink and/or Kafka cluster to observe better performance?

If you have time you can check out my simple code at: https://github.com/gdibernardo/streaming-engines-benchmark. If you have any suggestions regarding how to improve my experiments I'd love to hear from you.

Thank you so much.

Best,


Gabriele

Reply | Threaded
Open this post in threaded view
|

Re: Latency on Flink

Aljoscha Krettek
Hi Gabriele,

There are a couple of things you can try to speed up your pipeline:

1. Avoid JSON, parsing it and creating it are both slow. You can try using a binary format for your data, for example. If you can’t avoid JSON look into JSON parsing libraries and compare performance.

2. Avoid sending large events and avoid keeping received events when you can immediately aggregate them. I’ll try and explain this point with your code base as an example. Your code does basically this:

ReadFromKafka -> Parse -> BufferEventsAndForward -> FiltersAndStuff -> ComputeAggregation -> WriteToSink

Looking more closely at BufferElementsAndForward: for every event this function deserialises the state (which contains all events seen so far) adds the new event, then serialises the state again and then emits the current state downstream where this is eventually used to compute the aggregation in ComputeAggregation. Always serialising/deserialising all events when a new event comes in is quite costly. Sending all events you have seen so far as a new message downstream (which happens for every incoming event) is also very costly.

Instead you can condense the pipeline to something like this:

ReadFromKafka -> Parse -> IncrementallyAggregate -> WriteToSink

Here, IncrementallyAggregate would not keep all events seen so far but would instead keep a state for the aggregation that you want to perform. When you process an event you simply update the aggregate that you have so far. For example, for “average” you would keep a running sum and a count. You also don’t always emit a new message downstream for every incoming event but instead for for the “compute condition” to be met (I believe in your case you are waiting for a given number of events to arrive) and then send the aggregation result downstream.

Best,
Aljoscha

On 5. Jun 2017, at 14:42, Gabriele Di Bernardo <[hidden email]> wrote:

Hello everyone,

I am a completely newcomer of streaming engines and big data platforms but I am considering using Flink for my master thesis and before starting using it I am trying to do some kind of evaluation of the system. In particular I am interested in observing how the system reacts in terms of latency when it receives a big amount of data to process.

I set up a simple application consisting in:
– a Kafka producer that generates data for a Kafka topic; each data message is distinguished by a source id.

– a Flink consumer app that reads from Kafka and it should apply some kind of reduction operator to the received data (e.g. calculate MEAN value of the last 1000 elements received). The Flink consumer keeps the state of the messages coming from a certain source (not sure if this is the more efficient approach though). 

I run this application on AWS using EMR with a relatively simple configuration:
– EMR Cluster: Master m3.xlarge (4 CPU + 15GiB Memory), 2 Core (2 x m3.xlarge )
– Kafka + Zookeeper running in a m4.xlarge (4CPU + 16GiB memory).

I run the expirement with 2 task managers and 4 slots; I also tried to play with the number of partitions of the kafka topic but I experienced really high-latency with the increase of the number of messages generate per seconds by the Kafka producer. With the simple configuration described above I experienced really high latency when for example my consumer application generates 5000 double values per seconds; and more messages are created more the latency increases. 

I would like to ask you if, even for this super simple experiment, should I scale-out my Flink and/or Kafka cluster to observe better performance?

If you have time you can check out my simple code at: https://github.com/gdibernardo/streaming-engines-benchmark. If you have any suggestions regarding how to improve my experiments I'd love to hear from you.

Thank you so much.

Best,


Gabriele