Dear entity that represents Flink user community,
In order to formulate the question itself, I would need to describe the problem in many details, hence please bear with me for a while. I have following execution graph: KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy -> Storage -> keyBy -> Classifier -> KafkaSink (This is slightly simplified version ) When I noticed less than ideal throughput, I executed profiler which identified org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput) as a major function (83% of time spent here). 45% of total time is spent in org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable). The serialization is protobuf with Kryo, according to benchmarks it isn't particularly slow , should be similar or a bit better than POJO. The problem from my point of view is that serialization shouldn't happen at all, unless data is actually sent via network to another node ( in my case I have one job manager and one task manager ). However, I would suspect that keyBy operation implicitly enforces usage of serialization / deserialization. First question : In this particular case, the key is exactly the same for every keyBy, is there any other way than combining operations into a single operator to avoid performance impact from keyBy chain ? Second question : could I use the process function after keyBy in such a way that it will not merge stream back e.g. it will continue to be KeyedStream ? Third question: could I somehow specify that the sequence of operators must be executed in the same thread without serialization/deserialization operations in between ? Best regards, Alexander |
Hi Alex,
If you are sure that the operations in between do not change the partitioning of the data and keep the key constant for the whole pipeline you could use the reinterpretAsKeyedStream[1]. I guess this answers your questions 1 & 2. As for the third question, first of all you should look into enabling object reuse[2]. Make sure though you work with immutable objects. Secondly, all operators that simply forwards records should be chained by default. If you need a more fine grained control over it you can look into this docs[3] Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration [3] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups On 10/05/2021 08:59, Alex Drobinsky wrote: > Dear entity that represents Flink user community, > > In order to formulate the question itself, I would need to describe > the problem in many details, hence please bear with me for a while. > > I have following execution graph: > > KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy -> > Storage -> keyBy -> Classifier -> KafkaSink (This is slightly > simplified version ) > > When I noticed less than ideal throughput, I executed profiler which > identified > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput) > as a major function (83% of time spent here). 45% of total time is > spent in > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable). > > The serialization is protobuf with Kryo, according to benchmarks it > isn't particularly slow , should be similar or a bit better than POJO. > > The problem from my point of view is that serialization shouldn't > happen at all, unless data is actually sent via network to another > node ( in my case I have one job manager and one task manager ). > > However, I would suspect that keyBy operation implicitly enforces > usage of serialization / deserialization. > > First question : In this particular case, the key is exactly the same > for every keyBy, is there any other way than combining operations into > a single operator to avoid performance impact from keyBy chain ? > > Second question : could I use the process function after keyBy in such > a way that it will not merge stream back e.g. it will continue to be > KeyedStream ? > > Third question: could I somehow specify that the sequence of operators > must be executed in the same thread without > serialization/deserialization operations in between ? > > > Best regards, > Alexander OpenPGP_signature (855 bytes) Download Attachment |
Hi Alex, I cannot reproduce the issue. Do you mind checking if it is not an issue on your side? P.S. It would be nice if you could reply to the ML as well. That way other people can benefit from the answers. Moreover there will be more people who could help answering your question. Best, Dawid On 12/05/2021 11:36, Alex Drobinsky
wrote:
OpenPGP_signature (855 bytes) Download Attachment |
Free forum by Nabble | Edit this page |