Hi all
We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of Kafka and HDFS sources. We remarked that the throughput is 10 times higher if only one of these sources is consumed. While trying to identify the problem I implemented a no-op source which was unioned with one of the real sources: class NoOpSourceFunction extends ParallelSourceFunction[GenericRecord] { override def run(ctx: SourceContext[GenericRecord]): Unit = {} override def cancel(): Unit = {} } mainStream.union(env.addSource(new NoOpSourceFunction())) I remarked that whenever I use a union with any sources like above or union the stream with itself, I get the same performance hit. When I compare the job graph on the Flink UI the only difference is that in case of a union the two sources aren't chained to the subsequent downstream operators (transformation steps), both are connected to them with ship_strategy: FORWARD. When only one source is present, that one is chained to the transformation steps. To avoid union (and/or forward partitioning) I tried to connect streams with CoFlatMapFunction to get the same result but without any gain in performance. I was thinking about to read the HDFS stream parallel and use Iterate function to feed it back to a previous operator. After a couple of trial and error I'd like ask for your advice. What is the best practice here? Which options / tools are there to analyze the execution plan apart from the Flink plan visualizer and the provided web UI? Thanks Peter |
Hi Peter, The performance drops probably be due to de/serialization. When tasks are chained, records are simply forwarded as Java objects via method calls. When a task chain in broken into multiple operators, the records (Java objects) are serialized by the sending task, possibly shipped over the network, and deserialized by the receiving task. Depending on the logic of the tasks, this can cause a performance drop. Two tasks can only be chained, if both have the same parallelism and sender tasks sends to a single task and the receiver receives from a single task. The union receives from two tasks which cuts the chain. Best, Fabian Am Sa., 13. Juli 2019 um 15:04 Uhr schrieb Peter Zende <[hidden email]>:
|
Free forum by Nabble | Edit this page |