Improving Flink Performance

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Improving Flink Performance

Jonas Gröger
Hello! I'm reposting this since the other thread had some formatting issues apparently. I hope this time it works. I'm having performance problems with a Flink job. If there is anything valuable missing, please ask and I will try to answer ASAP. My job looks like this:
    /*
      Settings
     */
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    /*
      Operator Graph
     */
    env
      .addSource(new FlinkKafkaConsumer09("raw.my.topic", new SimpleStringSchema(), props)) // 100k msgs msgs/s
      .map(new MyJsonDecoder) // 25k msgs/s
      .map(new AddTypeToJsonForSplitting) // 20k msgs/s
      .split(t => Seq(t._1.name))
      .select(TYPE_A.name) // 18k msgs/s
      .flatMap(new MapJsonToEntity) // 13k msgs/s
      .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
      .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s

    /*
      Run
     */
    env.execute()
First, I read data from Kafka. This is very fast at 100k msgs/s. The data is decoded, a type is added (we have multiple message types per Kafka topic). Then we select the TYPE_A messages, create a Scala entity out of if (a case class). Afterwards in the MapEntityToMultipleEntities the Scala entities are split into multiple. Finally a watermark is added. As you can see the data is not keyed in any way yet. Is there a way to make this faster? Measurements were taken with
  def writeToSocket[?](d: DataStream[?], port: Int): Unit = {
    d.writeToSocket("localhost", port, new SerializationSchema[?] {
      override def serialize(element: ?): Array[Byte] = {
        "\n".getBytes(CharsetUtil.UTF_8)
      }
    })
  }
and
nc -lk PORT | pv --line-mode --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > /dev/null
I'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Aljoscha Krettek
Hi,
I think MyJsonDecoder is the bottleneck and I'm also afraid there is nothing to do because parsing Strings to Json is simply slow.

I think you would see the biggest gains if you had a binary representation that can quickly be serialised/deserialised to objects and you use that instead of String/JSON.

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 12:17 Jonas <[hidden email]> wrote:
Hello! I'm reposting this since the other thread had some formatting issues apparently. I hope this time it works. I'm having performance problems with a Flink job. If there is anything valuable missing, please ask and I will try to answer ASAP. My job looks like this:
    /*
      Settings
     */
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    /*
      Operator Graph
     */
    env
      .addSource(new FlinkKafkaConsumer09("raw.my.topic", new SimpleStringSchema(), props)) // 100k msgs msgs/s
      .map(new MyJsonDecoder) // 25k msgs/s
      .map(new AddTypeToJsonForSplitting) // 20k msgs/s
      .split(t => Seq(t._1.name))
      .select(TYPE_A.name) // 18k msgs/s
      .flatMap(new MapJsonToEntity) // 13k msgs/s
      .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
      .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s

    /*
      Run
     */
    env.execute()
First, I read data from Kafka. This is very fast at 100k msgs/s. The data is decoded, a type is added (we have multiple message types per Kafka topic). Then we select the TYPE_A messages, create a Scala entity out of if (a case class). Afterwards in the MapEntityToMultipleEntities the Scala entities are split into multiple. Finally a watermark is added. As you can see the data is not keyed in any way yet. Is there a way to make this faster? Measurements were taken with
  def writeToSocket[?](d: DataStream[?], port: Int): Unit = {
    d.writeToSocket("localhost", port, new SerializationSchema[?] {
      override def serialize(element: ?): Array[Byte] = {
        "\n".getBytes(CharsetUtil.UTF_8)
      }
    })
  }
and
nc -lk PORT | pv --line-mode --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > /dev/null
I'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4

View this message in context: Improving Flink Performance
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Stephan Ewen
One thing you can try and do is to enable object reuse in the execution config.
That should get rid of the overhead when passing the JSON objects from function to function.

On Tue, Jan 24, 2017 at 6:00 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think MyJsonDecoder is the bottleneck and I'm also afraid there is nothing to do because parsing Strings to Json is simply slow.

I think you would see the biggest gains if you had a binary representation that can quickly be serialised/deserialised to objects and you use that instead of String/JSON.

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 12:17 Jonas <[hidden email]> wrote:
Hello! I'm reposting this since the other thread had some formatting issues apparently. I hope this time it works. I'm having performance problems with a Flink job. If there is anything valuable missing, please ask and I will try to answer ASAP. My job looks like this:
    /*
      Settings
     */
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    /*
      Operator Graph
     */
    env
      .addSource(new FlinkKafkaConsumer09("raw.my.topic", new SimpleStringSchema(), props)) // 100k msgs msgs/s
      .map(new MyJsonDecoder) // 25k msgs/s
      .map(new AddTypeToJsonForSplitting) // 20k msgs/s
      .split(t => Seq(t._1.name))
      .select(TYPE_A.name) // 18k msgs/s
      .flatMap(new MapJsonToEntity) // 13k msgs/s
      .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
      .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s

    /*
      Run
     */
    env.execute()
First, I read data from Kafka. This is very fast at 100k msgs/s. The data is decoded, a type is added (we have multiple message types per Kafka topic). Then we select the TYPE_A messages, create a Scala entity out of if (a case class). Afterwards in the MapEntityToMultipleEntities the Scala entities are split into multiple. Finally a watermark is added. As you can see the data is not keyed in any way yet. Is there a way to make this faster? Measurements were taken with
  def writeToSocket[?](d: DataStream[?], port: Int): Unit = {
    d.writeToSocket("localhost", port, new SerializationSchema[?] {
      override def serialize(element: ?): Array[Byte] = {
        "\n".getBytes(CharsetUtil.UTF_8)
      }
    })
  }
and
nc -lk PORT | pv --line-mode --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > /dev/null
I'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4

View this message in context: Improving Flink Performance
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Jonas Gröger
This post was updated on .
In reply to this post by Aljoscha Krettek
The performance hit due to decoding the JSON is expected and there is not a lot (except for changing the encoding that I can do about that). Alright.

When joining (CoFlatMap with State) the above stream with another stream I get another performance hit by ~80% so that in the end I have only 1k msgs/s remaining. Do you know how to improve that? Might setting the buffer size / timeout be worth exploring?
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Stephan Ewen
Have you tried the object reuse option mentioned above?

On Tue, Jan 24, 2017 at 6:52 PM, Jonas <[hidden email]> wrote:
The performance hit due to decoding the JSON is expected and there is not a
lot (except for changing the encoding that I can do about that). Alright.

When joining the above stream with another stream I get another performance
hit by ~80% so that in the end I have only 1k msgs/s remaining. Do you know
how to improve that? Might setting the buffer size / timeout be worth
exploring?




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11272.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Jonas Gröger
I tried and it added a little performance (~10%) but nothing outstanding.
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Jonas Gröger
I ran a profiler on my Job and it seems that most of the time, its waiting :O See here:



Also, the following code snippet executes unexpectedly slow:
def announceAttributeFast(packet: Packet, out: Collector[Either[This, That]]): Unit = {
  for (f <- packet.one) {
    for (o <- f.two) {
      for (r <- o.three) {
        out.collect(Left(This(f, o, r)))
      }
    }
  }
}  
as you can see in this call graph:


Any ideas?
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Jonas Gröger
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Matt
Offtopic: What profiler is it that you're using?
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Jonas Gröger
JProfiler
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

rmetzger0
Hi Jonas,

The good news is that your job is completely parallelizable. So if you are running it on a cluster, you can scale it at least to the number of Kafka partitions you have (actually even further, because the Kafka consumers are not the issue).

I don't think that the scala (=akka) worker threads are really the thing that slows everything done. These threads should usually idle.
I just tried it with Visualvm (I don't own a Jprofiler license :) ) and you can nicely see what's eating up CPU resources in my job: http://i.imgur.com/nqXeHdi.png




On Thu, Jan 26, 2017 at 1:23 PM, Jonas <[hidden email]> wrote:
JProfiler



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11311.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Stephan Ewen
@jonas Flink's Fork-Join Pool drives only the actors, which are doing coordination. Unless your job is permanently failing/recovering, they don't do much.


On Thu, Jan 26, 2017 at 2:56 PM, Robert Metzger <[hidden email]> wrote:
Hi Jonas,

The good news is that your job is completely parallelizable. So if you are running it on a cluster, you can scale it at least to the number of Kafka partitions you have (actually even further, because the Kafka consumers are not the issue).

I don't think that the scala (=akka) worker threads are really the thing that slows everything done. These threads should usually idle.
I just tried it with Visualvm (I don't own a Jprofiler license :) ) and you can nicely see what's eating up CPU resources in my job: http://i.imgur.com/nqXeHdi.png




On Thu, Jan 26, 2017 at 1:23 PM, Jonas <[hidden email]> wrote:
JProfiler



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11311.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Jonas Gröger
Using a profiler I found out that the main performance problem (80%) was spent in a domain specific data structure. After implementing it with a more efficient one, the performance problems are gone.
Reply | Threaded
Open this post in threaded view
|

Re: Improving Flink Performance

Fabian Hueske-2
Hi Jonas,

thanks for reporting back!
Glad you solve the issue.

Cheers, Fabian

2017-02-05 22:07 GMT+01:00 Jonas <[hidden email]>:
Using a profiler I found out that the main performance problem (80%) was
spent in a domain specific data structure. After implementing it with a more
efficient one, the performance problems are gone.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11447.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.