Hello!
I'm reposting this since the other thread had some formatting issues apparently. I hope this time it works.
I'm having performance problems with a Flink job. If there is anything valuable missing, please ask and I will try to answer ASAP. My job looks like this:
/* Settings */ env.setParallelism(4) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) /* Operator Graph */ env .addSource(new FlinkKafkaConsumer09("raw.my.topic", new SimpleStringSchema(), props)) // 100k msgs msgs/s .map(new MyJsonDecoder) // 25k msgs/s .map(new AddTypeToJsonForSplitting) // 20k msgs/s .split(t => Seq(t._1.name)) .select(TYPE_A.name) // 18k msgs/s .flatMap(new MapJsonToEntity) // 13k msgs/s .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s /* Run */ env.execute()First, I read data from Kafka. This is very fast at 100k msgs/s. The data is decoded, a type is added (we have multiple message types per Kafka topic). Then we select the TYPE_A messages, create a Scala entity out of if (a case class). Afterwards in the MapEntityToMultipleEntities the Scala entities are split into multiple. Finally a watermark is added. As you can see the data is not keyed in any way yet. Is there a way to make this faster? Measurements were taken with def writeToSocket[?](d: DataStream[?], port: Int): Unit = { d.writeToSocket("localhost", port, new SerializationSchema[?] { override def serialize(element: ?): Array[Byte] = { "\n".getBytes(CharsetUtil.UTF_8) } }) }and nc -lk PORT | pv --line-mode --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > /dev/nullI'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4 |
Hi, I think MyJsonDecoder is the bottleneck and I'm also afraid there is nothing to do because parsing Strings to Json is simply slow. I think you would see the biggest gains if you had a binary representation that can quickly be serialised/deserialised to objects and you use that instead of String/JSON. Cheers, Aljoscha On Tue, 24 Jan 2017 at 12:17 Jonas <[hidden email]> wrote: Hello! I'm reposting this since the other thread had some formatting issues apparently. I hope this time it works. I'm having performance problems with a Flink job. If there is anything valuable missing, please ask and I will try to answer ASAP. My job looks like this:/* Settings */ env.setParallelism(4) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) /* Operator Graph */ env .addSource(new FlinkKafkaConsumer09("raw.my.topic", new SimpleStringSchema(), props)) // 100k msgs msgs/s .map(new MyJsonDecoder) // 25k msgs/s .map(new AddTypeToJsonForSplitting) // 20k msgs/s .split(t => Seq(t._1.name)) .select(TYPE_A.name) // 18k msgs/s .flatMap(new MapJsonToEntity) // 13k msgs/s .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s /* Run */ env.execute()First, I read data from Kafka. This is very fast at 100k msgs/s. The data is decoded, a type is added (we have multiple message types per Kafka topic). Then we select the TYPE_A messages, create a Scala entity out of if (a case class). Afterwards in the MapEntityToMultipleEntities the Scala entities are split into multiple. Finally a watermark is added. As you can see the data is not keyed in any way yet. Is there a way to make this faster? Measurements were taken withdef writeToSocket[?](d: DataStream[?], port: Int): Unit = { d.writeToSocket("localhost", port, new SerializationSchema[?] { override def serialize(element: ?): Array[Byte] = { "\n".getBytes(CharsetUtil.UTF_8) } }) }andnc -lk PORT | pv --line-mode --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > /dev/nullI'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4 |
One thing you can try and do is to enable object reuse in the execution config. That should get rid of the overhead when passing the JSON objects from function to function. On Tue, Jan 24, 2017 at 6:00 PM, Aljoscha Krettek <[hidden email]> wrote:
|
This post was updated on .
In reply to this post by Aljoscha Krettek
The performance hit due to decoding the JSON is expected and there is not a lot (except for changing the encoding that I can do about that). Alright.
When joining (CoFlatMap with State) the above stream with another stream I get another performance hit by ~80% so that in the end I have only 1k msgs/s remaining. Do you know how to improve that? Might setting the buffer size / timeout be worth exploring? |
Have you tried the object reuse option mentioned above?
On Tue, Jan 24, 2017 at 6:52 PM, Jonas <[hidden email]> wrote: The performance hit due to decoding the JSON is expected and there is not a |
I tried and it added a little performance (~10%) but nothing outstanding.
|
I ran a profiler on my Job and it seems that most of the time, its waiting :O See here:
Also, the following code snippet executes unexpectedly slow:
def announceAttributeFast(packet: Packet, out: Collector[Either[This, That]]): Unit = { for (f <- packet.one) { for (o <- f.two) { for (r <- o.three) { out.collect(Left(This(f, o, r))) } } } }as you can see in this call graph: Any ideas? |
Offtopic: What profiler is it that you're using?
> On Jan 25, 2017, at 18:11, Jonas <[hidden email]> wrote: > > Images: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png > and > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11307.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
JProfiler
|
Hi Jonas, The good news is that your job is completely parallelizable. So if you are running it on a cluster, you can scale it at least to the number of Kafka partitions you have (actually even further, because the Kafka consumers are not the issue). I don't think that the scala (=akka) worker threads are really the thing that slows everything done. These threads should usually idle. I just tried it with Visualvm (I don't own a Jprofiler license :) ) and you can nicely see what's eating up CPU resources in my job: http://i.imgur.com/nqXeHdi.png On Thu, Jan 26, 2017 at 1:23 PM, Jonas <[hidden email]> wrote: JProfiler |
@jonas Flink's Fork-Join Pool drives only the actors, which are doing coordination. Unless your job is permanently failing/recovering, they don't do much. On Thu, Jan 26, 2017 at 2:56 PM, Robert Metzger <[hidden email]> wrote:
|
Using a profiler I found out that the main performance problem (80%) was spent in a domain specific data structure. After implementing it with a more efficient one, the performance problems are gone.
|
Hi Jonas, thanks for reporting back!Glad you solve the issue. 2017-02-05 22:07 GMT+01:00 Jonas <[hidden email]>: Using a profiler I found out that the main performance problem (80%) was |
Free forum by Nabble | Edit this page |