Possible way to avoid unnecessary serialization calls.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Possible way to avoid unnecessary serialization calls.

Alex Drobinsky
Dear entity that represents Flink user community,

In order to formulate the question itself, I would need to describe the problem in many details, hence please bear with me for a while.

I have following execution graph:

KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy -> Storage -> keyBy -> Classifier -> KafkaSink (This is slightly simplified version )

When I noticed less than ideal throughput, I executed profiler which identified org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput) as a major function (83% of time spent here). 45% of total time is spent in org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable).

The serialization is protobuf with Kryo, according to benchmarks it isn't particularly slow , should be similar or a bit better than POJO.

The problem from my point of view is that serialization shouldn't happen at all, unless data is actually sent via network to another node ( in my case I have one job manager and one task manager ).

However, I would suspect that keyBy operation implicitly enforces usage of serialization / deserialization.

First question : In this particular case, the key is exactly the same for every keyBy, is there any other way than combining operations into a single operator to avoid performance impact from keyBy chain ?

Second question : could I use the process function after keyBy in such a way that it will not merge stream back e.g. it will continue to be KeyedStream ?

Third question: could I somehow specify that the sequence of operators must be executed in the same thread without serialization/deserialization operations in between ?


Best regards,
Alexander 
Reply | Threaded
Open this post in threaded view
|

Re: Possible way to avoid unnecessary serialization calls.

Dawid Wysakowicz-2
Hi Alex,

If you are sure that the operations in between do not change the
partitioning of the data and keep the key constant for the whole
pipeline you could use the reinterpretAsKeyedStream[1]. I guess this
answers your questions 1 & 2.

As for the third question, first of all you should look into enabling
object reuse[2]. Make sure though you work with immutable objects.
Secondly, all operators that simply forwards records should be chained
by default. If you need a more fine grained control over it you can look
into this docs[3]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration

[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups

On 10/05/2021 08:59, Alex Drobinsky wrote:

> Dear entity that represents Flink user community,
>
> In order to formulate the question itself, I would need to describe
> the problem in many details, hence please bear with me for a while.
>
> I have following execution graph:
>
> KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy ->
> Storage -> keyBy -> Classifier -> KafkaSink (This is slightly
> simplified version )
>
> When I noticed less than ideal throughput, I executed profiler which
> identified
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput)
> as a major function (83% of time spent here). 45% of total time is
> spent in
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable).
>
> The serialization is protobuf with Kryo, according to benchmarks it
> isn't particularly slow , should be similar or a bit better than POJO.
>
> The problem from my point of view is that serialization shouldn't
> happen at all, unless data is actually sent via network to another
> node ( in my case I have one job manager and one task manager ).
>
> However, I would suspect that keyBy operation implicitly enforces
> usage of serialization / deserialization.
>
> First question : In this particular case, the key is exactly the same
> for every keyBy, is there any other way than combining operations into
> a single operator to avoid performance impact from keyBy chain ?
>
> Second question : could I use the process function after keyBy in such
> a way that it will not merge stream back e.g. it will continue to be
> KeyedStream ?
>
> Third question: could I somehow specify that the sequence of operators
> must be executed in the same thread without
> serialization/deserialization operations in between ?
>
>
> Best regards,
> Alexander 


OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Possible way to avoid unnecessary serialization calls.

Dawid Wysakowicz-2

Hi Alex,

I cannot reproduce the issue. Do you mind checking if it is not an issue on your side?

P.S. It would be nice if you could reply to the ML as well. That way other people can benefit from the answers. Moreover there will be more people who could help answering your question.

Best,

Dawid

On 12/05/2021 11:36, Alex Drobinsky wrote:
Hi Dawid,

I upgraded flink to 1.13.0 and stumbled upon strange phenomenon ( usually, I would use word bug though ) - 
the start-cluster.sh overwrites my flink-conf.yml taskmanager.numberOfTaskSlots , now it's always set to 1.
Is it a bug or something else ?

Best regards,
Alexander


пн, 10 мая 2021 г. в 15:08, Dawid Wysakowicz <[hidden email]>:
Hi Alex,

If you are sure that the operations in between do not change the
partitioning of the data and keep the key constant for the whole
pipeline you could use the reinterpretAsKeyedStream[1]. I guess this
answers your questions 1 & 2.

As for the third question, first of all you should look into enabling
object reuse[2]. Make sure though you work with immutable objects.
Secondly, all operators that simply forwards records should be chained
by default. If you need a more fine grained control over it you can look
into this docs[3]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration

[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups

On 10/05/2021 08:59, Alex Drobinsky wrote:
> Dear entity that represents Flink user community,
>
> In order to formulate the question itself, I would need to describe
> the problem in many details, hence please bear with me for a while.
>
> I have following execution graph:
>
> KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy ->
> Storage -> keyBy -> Classifier -> KafkaSink (This is slightly
> simplified version )
>
> When I noticed less than ideal throughput, I executed profiler which
> identified
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput)
> as a major function (83% of time spent here). 45% of total time is
> spent in
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable).
>
> The serialization is protobuf with Kryo, according to benchmarks it
> isn't particularly slow , should be similar or a bit better than POJO.
>
> The problem from my point of view is that serialization shouldn't
> happen at all, unless data is actually sent via network to another
> node ( in my case I have one job manager and one task manager ).
>
> However, I would suspect that keyBy operation implicitly enforces
> usage of serialization / deserialization.
>
> First question : In this particular case, the key is exactly the same
> for every keyBy, is there any other way than combining operations into
> a single operator to avoid performance impact from keyBy chain ?
>
> Second question : could I use the process function after keyBy in such
> a way that it will not merge stream back e.g. it will continue to be
> KeyedStream ?
>
> Third question: could I somehow specify that the sequence of operators
> must be executed in the same thread without
> serialization/deserialization operations in between ?
>
>
> Best regards,
> Alexander 


OpenPGP_signature (855 bytes) Download Attachment