This post was updated on .
Initially, we had a flink pipeline which did the following -
Kafka source -> KeyBy ID -> Map -> Kafka Sink. We now want to enrich this pipeline with a side input from a file. Similar to the recommendation here <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-pattern-for-achieving-stream-enrichment-side-input-from-a-large-static-source-td25771.html#a25780> , we modified our pipeline to have the following flow. Kafka -------------------> KeyBy ID------ -----Co flat map-----> Sink | | HDFS Source ----------->KeyBy ID ------ (ID, static data) The file is relatively small( a few GBs) but not small enough to fit in RAM of the driver node. So we cannot load it there. The issue I am facing is that we are having high back pressure in this new pipeline after adding the file source. The initial job(without side inputs) worked fine with good through put. However, after adding the second source and a co flat Map function, even 4X ing the amount of parallelism does not solve the problem and has high back pressure. 1. The back pressure is high between the Kafka source and the coFlatMap function. Is this expected? Could some one help with the reasoning behind why this new pipeline is much more resource intensive than the original pipeline? 2. Are there any caveats to keep in mind when connecting a bounded stream with an unbounded stream? 3. Also, is this the recommended design pattern for enriching a stream with a static data set? Is there a better way to solve the problem? Also we are using Flink-1.6.3 so we do not have the ability to use any API advancements in the future versions of Flink. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Sundar 1. I think you might need to jstack the java process that is a bottleneck and find where the task is stuck. 2. Could you share the code that your job looked like? I think maybe it could help people to know what exactly happened. Best, Guowei sundar <[hidden email]> 于2020年5月20日周三 上午5:24写道: Initially, we had a flink pipeline which did the following - |
This post was updated on .
Hi Guaowei,
Here is what the code for my pipeline looks like. Class CoFlatMapFunc extends CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput> { ValueState<FileInput> cache; public void open(Configuration parameters){ //initialize cache } //read element from file and update cache. public void flatMap1(FileInput fileInput, Collector<KafkaOutput> collector){ cache.update(fileInput); } //read element from kafka, look up cache and output tuple. public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput> collector){ return new KafkaOutput(kafkaInput,cache.value()); } } // Old pipeline that works fine. Class OldFlinkPipeline { public SingleOutputStreamOperator<KafkaOutput> generateOutput(StreamExecutionEnvironment env){ DataStream<KafkaInput> kafkaStream = env .addSource(new KafkaSourceFunction()); return kafkaStream .map(kafkaInput -> new KafkaOutput(kafkaInput, null /*fileInput*/ ); } } //New pipeline that is consuming more than 4X the resources. Class NewFlinkPipeline { public SingleOutputStreamOperator<KafkaOutput> generateOutput(StreamExecutionEnvironment env){ KeyedStream<KafkaInput,ID> kafkaStream = env .addSource(new KafkaSourceFunction()) .keyBy(kafkaInput -> kafkaInput.getId()); KeyedStream<FileInput,ID> fileStream = env .readTextFile("file.txt") .keyBy(fileInput -> fileInput.getId()); return fileStream .connect(kafkaStream) .coFlatMap(new CoFlatMapFunc()) } } Please do let me know if this is the recommended way to connect a bounded stream with an unbounded stream, or if I am doing something obviously expensive here. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Sundar, 1. Could you check the GC status of the process? or you could increase the memory size of your TM. (I find that you use the value state and I assume that you use the MemoryStatebackend) 2. AFAIK there is no performance limitation in using the `connect` operator for mixing the bounded/unbounded stream. Best, Guowei sundar <[hidden email]> 于2020年5月20日周三 上午9:54写道: Hi Guaowei, |
Hi Sundar, in general, you wouldn't load the static data in the driver, but upon opening the map on the processing nodes. If your processing nodes could hold the data, it might be the easiest to switch to this pattern. You usually load it once per node across all subtasks by using some kind of static map. I can go into details if you are interested. Now for the actual question. 1. As soon as you use Co* functions, your pipeline cannot be chained anymore and needs to go over the network. I suspect that this is where the lag comes from. You can try to tweak the network configurations if you use a high degree of parallelism [1]. 2. Not that I know of. 3. See above. On Wed, May 20, 2020 at 4:55 AM Guowei Ma <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks a lot for all the help!
I was able to figure out the bug just now. I had some extra code in the coFlatMap function(emitting stats) which was inefficient and causing high GC usage. Fixing that fixed the issue. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |