Flink + Kafka + Scalabuff issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink + Kafka + Scalabuff issue

Alexander Gryzlov

Hello,

 

Has anyone tried using ScalaBuff (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to consume Protobuf messages from Kafka 0.8 and have hit a performance issue. We run this code:

 

https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where Foo is pre-generated by ScalaBuff compiler)

 

and get these numbers (whereas the topic produces 20K msg/sec on the average):

During the last 153 ms, we received 997 elements. That's 6516.339869281046 elements/second/core.
During the last 214 ms, we received 998 elements. That's 4663.551401869159 elements/second/core.
During the last 223 ms, we received 1000 elements. That's 4484.304932735426 elements/second/core.
During the last 282 ms, we received 1000 elements. That's 3546.0992907801415 elements/second/core.
During the last 378 ms, we received 1001 elements. That's 2648.1481481481483 elements/second/core.
During the last 544 ms, we received 999 elements. That's 1836.3970588235293 elements/second/core.
During the last 434 ms, we received 999 elements. That's 2301.84331797235 elements/second/core.
During the last 432 ms, we received 1000 elements. That's 2314.814814814815 elements/second/core.
During the last 400 ms, we received 991 elements. That's 2477.5 elements/second/core.
During the last 296 ms, we received 998 elements. That's 3371.6216216216217 elements/second/core.
During the last 561 ms, we received 1000 elements. That's 1782.5311942959001 elements/second/core.

...

 

The number/sec/core keeps falling until it stabilizes at ~5-10 elem/sec after a few hours.

 

Looking with JMX at the app gets us (first number is RUNNING, second is MONITOR):

SimpleConsumer - Source: Custom Source -> Flat Map - broker-1  10.8% 89.1%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-2  14.0% 85.9%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-3  13.6% 86.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-4  12.6% 87.3%

SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% 87.7%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% 84.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% 88.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-13  9.7% 90.2%

If the schema is modified to simply return an Array[Byte], we get a proper speed of ~20K/sec and RUNNING is 100% on all broker threads.

 

From a thread dump, it’s clear that only a single consumer thread works at a time, while the rest are locked by sourceContext.getCheckpointLock() at https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663

Alex
Reply | Threaded
Open this post in threaded view
|

Re: Flink + Kafka + Scalabuff issue

Ufuk Celebi
Hey Alex,

(1) Which Flink version are you using for this?

(2) Can you also get a heap dump after the job slows down? Slow downs
like this are often caused by some component leaking memory, maybe in
Flink, maybe the Scalabuff deserializer. Can you also share the Foo
code?

– Ufuk


On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov
<[hidden email]> wrote:

> Hello,
>
>
>
> Has anyone tried using ScalaBuff
> (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to
> consume Protobuf messages from Kafka 0.8 and have hit a performance issue.
> We run this code:
>
>
>
> https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where Foo
> is pre-generated by ScalaBuff compiler)
>
>
>
> and get these numbers (whereas the topic produces 20K msg/sec on the
> average):
>
> During the last 153 ms, we received 997 elements. That's 6516.339869281046
> elements/second/core.
> During the last 214 ms, we received 998 elements. That's 4663.551401869159
> elements/second/core.
> During the last 223 ms, we received 1000 elements. That's 4484.304932735426
> elements/second/core.
> During the last 282 ms, we received 1000 elements. That's 3546.0992907801415
> elements/second/core.
> During the last 378 ms, we received 1001 elements. That's 2648.1481481481483
> elements/second/core.
> During the last 544 ms, we received 999 elements. That's 1836.3970588235293
> elements/second/core.
> During the last 434 ms, we received 999 elements. That's 2301.84331797235
> elements/second/core.
> During the last 432 ms, we received 1000 elements. That's 2314.814814814815
> elements/second/core.
> During the last 400 ms, we received 991 elements. That's 2477.5
> elements/second/core.
> During the last 296 ms, we received 998 elements. That's 3371.6216216216217
> elements/second/core.
> During the last 561 ms, we received 1000 elements. That's 1782.5311942959001
> elements/second/core.
>
> ...
>
>
>
> The number/sec/core keeps falling until it stabilizes at ~5-10 elem/sec
> after a few hours.
>
>
>
> Looking with JMX at the app gets us (first number is RUNNING, second is
> MONITOR):
>
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-1  10.8% 89.1%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-2  14.0% 85.9%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-3  13.6% 86.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-4  12.6% 87.3%
>
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% 87.7%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% 84.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% 88.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-13  9.7% 90.2%
>
> If the schema is modified to simply return an Array[Byte], we get a proper
> speed of ~20K/sec and RUNNING is 100% on all broker threads.
>
>
>
> From a thread dump, it’s clear that only a single consumer thread works at a
> time, while the rest are locked by sourceContext.getCheckpointLock() at
> https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663
>
> Alex
Reply | Threaded
Open this post in threaded view
|

Re: Flink + Kafka + Scalabuff issue

rmetzger0
Hi Alex,
I suspect its a GC issue with the code generated by ScalaBuff. Can you maybe try to do something like a standalone test where use use a while(true) loop to see how fast you can deserialize elements from your Foo type?
Maybe you'll find that the JVM is growing all the time. Then there's probably a memory leak somewhere.

On Tue, Apr 19, 2016 at 11:42 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Alex,

(1) Which Flink version are you using for this?

(2) Can you also get a heap dump after the job slows down? Slow downs
like this are often caused by some component leaking memory, maybe in
Flink, maybe the Scalabuff deserializer. Can you also share the Foo
code?

– Ufuk


On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov
<[hidden email]> wrote:
> Hello,
>
>
>
> Has anyone tried using ScalaBuff
> (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to
> consume Protobuf messages from Kafka 0.8 and have hit a performance issue.
> We run this code:
>
>
>
> https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where Foo
> is pre-generated by ScalaBuff compiler)
>
>
>
> and get these numbers (whereas the topic produces 20K msg/sec on the
> average):
>
> During the last 153 ms, we received 997 elements. That's 6516.339869281046
> elements/second/core.
> During the last 214 ms, we received 998 elements. That's 4663.551401869159
> elements/second/core.
> During the last 223 ms, we received 1000 elements. That's 4484.304932735426
> elements/second/core.
> During the last 282 ms, we received 1000 elements. That's 3546.0992907801415
> elements/second/core.
> During the last 378 ms, we received 1001 elements. That's 2648.1481481481483
> elements/second/core.
> During the last 544 ms, we received 999 elements. That's 1836.3970588235293
> elements/second/core.
> During the last 434 ms, we received 999 elements. That's 2301.84331797235
> elements/second/core.
> During the last 432 ms, we received 1000 elements. That's 2314.814814814815
> elements/second/core.
> During the last 400 ms, we received 991 elements. That's 2477.5
> elements/second/core.
> During the last 296 ms, we received 998 elements. That's 3371.6216216216217
> elements/second/core.
> During the last 561 ms, we received 1000 elements. That's 1782.5311942959001
> elements/second/core.
>
> ...
>
>
>
> The number/sec/core keeps falling until it stabilizes at ~5-10 elem/sec
> after a few hours.
>
>
>
> Looking with JMX at the app gets us (first number is RUNNING, second is
> MONITOR):
>
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-1  10.8% 89.1%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-2  14.0% 85.9%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-3  13.6% 86.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-4  12.6% 87.3%
>
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% 87.7%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% 84.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% 88.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-13  9.7% 90.2%
>
> If the schema is modified to simply return an Array[Byte], we get a proper
> speed of ~20K/sec and RUNNING is 100% on all broker threads.
>
>
>
> From a thread dump, it’s clear that only a single consumer thread works at a
> time, while the rest are locked by sourceContext.getCheckpointLock() at
> https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663
>
> Alex

Reply | Threaded
Open this post in threaded view
|

Re: Flink + Kafka + Scalabuff issue

Alexander Gryzlov
Hello, 

Just to follow up on this issue: after collecting some data and setting up additional tests we have managed to pinpoint the issue to the the ScalaBuff-generated code that decodes enumerations. After switching to use ScalaPB generator instead, the problem was gone. 

One thing peculiar about this bug, however, is that it seems to manifest only on Flink. We have a number of ad-hoc streaming pipelines (without Flink) that are still using the very same decoder code and have been running for weeks without seemingly experiencing any memory or performance issues. The versions of Flink that we saw this happening this on are 1.0 and 1.0.1.

Alex

On Tue, Apr 19, 2016 at 1:11 PM, Robert Metzger <[hidden email]> wrote:
Hi Alex,
I suspect its a GC issue with the code generated by ScalaBuff. Can you maybe try to do something like a standalone test where use use a while(true) loop to see how fast you can deserialize elements from your Foo type?
Maybe you'll find that the JVM is growing all the time. Then there's probably a memory leak somewhere.

On Tue, Apr 19, 2016 at 11:42 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Alex,

(1) Which Flink version are you using for this?

(2) Can you also get a heap dump after the job slows down? Slow downs
like this are often caused by some component leaking memory, maybe in
Flink, maybe the Scalabuff deserializer. Can you also share the Foo
code?

– Ufuk

On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov
<[hidden email]> wrote:
> Hello,
>
> Has anyone tried using ScalaBuff
> (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to
> consume Protobuf messages from Kafka 0.8 and have hit a performance issue.

Reply | Threaded
Open this post in threaded view
|

Re: Flink + Kafka + Scalabuff issue

rmetzger0
Hi Alex,
thanks for the update. I'm happy to hear you were able to resolve the issue.
How are the other ad-hoc streaming pipelines setup?
Maybe these pipelines use a different threading model than Flink. In Flink, we often have many instances of the same serializer running in the same JVM. Maybe there are some static lookup tables in the generated code that are accessed in parallel by many threads in the flink case, leading to these issues?

On Tue, May 3, 2016 at 5:14 PM, Alexander Gryzlov <[hidden email]> wrote:
Hello, 

Just to follow up on this issue: after collecting some data and setting up additional tests we have managed to pinpoint the issue to the the ScalaBuff-generated code that decodes enumerations. After switching to use ScalaPB generator instead, the problem was gone. 

One thing peculiar about this bug, however, is that it seems to manifest only on Flink. We have a number of ad-hoc streaming pipelines (without Flink) that are still using the very same decoder code and have been running for weeks without seemingly experiencing any memory or performance issues. The versions of Flink that we saw this happening this on are 1.0 and 1.0.1.

Alex

On Tue, Apr 19, 2016 at 1:11 PM, Robert Metzger <[hidden email]> wrote:
Hi Alex,
I suspect its a GC issue with the code generated by ScalaBuff. Can you maybe try to do something like a standalone test where use use a while(true) loop to see how fast you can deserialize elements from your Foo type?
Maybe you'll find that the JVM is growing all the time. Then there's probably a memory leak somewhere.

On Tue, Apr 19, 2016 at 11:42 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Alex,

(1) Which Flink version are you using for this?

(2) Can you also get a heap dump after the job slows down? Slow downs
like this are often caused by some component leaking memory, maybe in
Flink, maybe the Scalabuff deserializer. Can you also share the Foo
code?

– Ufuk

On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov
<[hidden email]> wrote:
> Hello,
>
> Has anyone tried using ScalaBuff
> (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to
> consume Protobuf messages from Kafka 0.8 and have hit a performance issue.