CoFlatMap has high back pressure

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

CoFlatMap has high back pressure

sundar
This post was updated on .
Initially, we had a flink pipeline which did the following -
Kafka source -> KeyBy ID -> Map -> Kafka Sink.

We now want to enrich this pipeline with a side input from a file. Similar
to the recommendation  here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-pattern-for-achieving-stream-enrichment-side-input-from-a-large-static-source-td25771.html#a25780
,
we modified our pipeline to have the following flow.

Kafka -------------------> KeyBy ID------ -----Co flat map-----> Sink                  
                                                         |
                                                         |
HDFS Source ----------->KeyBy ID ------
(ID, static data)

The file is relatively small( a few GBs) but not small enough to fit in RAM
of the driver node. So we cannot load it there.

The issue I am facing is that we are having high back pressure in this new
pipeline after adding the file source. The initial job(without side inputs)
worked fine with good through put. However, after adding the second source
and a co flat Map function, even 4X ing the amount of parallelism does not
solve the problem and has high back pressure.
1. The back pressure is high between the Kafka source and the coFlatMap
function.
Is this expected?  Could some one help with the reasoning behind why this
new pipeline is much more resource intensive than the original pipeline?
2. Are there any caveats to keep in mind when connecting a bounded stream
with an unbounded stream?
3. Also, is this the recommended design pattern for enriching a stream with
a static data set? Is there a better way to solve the problem?

Also we are using Flink-1.6.3 so we do not have the ability to use any API
advancements in the future versions of Flink.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoFlatMap has high back pressure

Guowei Ma
Hi, Sundar

1. I think you might need to jstack the java process that is a bottleneck and find where the task is stuck.
2. Could you share the code that your job looked like? I think maybe it could help people to know what exactly happened.

Best,
Guowei


sundar <[hidden email]> 于2020年5月20日周三 上午5:24写道:
Initially, we had a flink pipeline which did the following -
Kafka source -> KeyBy ID -> Map -> Kafka Sink.

We now want to enrich this pipeline with a side input from a file. Similar
to the recommendation  here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-pattern-for-achieving-stream-enrichment-side-input-from-a-large-static-source-td25771.html#a25780>
,
we modified our pipeline to have the following flow.

Kafka ---------------------------------> KeyBy ID------ -----Co flat map
-----> Sink

|

|
Continuous File Monitoring Function ->KeyBy ID ------
(ID, static data)

The file is relatively small( a few GBs) but not small enough to fit in RAM
of the driver node. So we cannot load it there.

The issue I am facing is that we are having high back pressure in this new
pipeline after adding the file source. The initial job(without side inputs)
worked fine with good through put. However, after adding the second source
and a co flat Map function, even 4X ing the amount of parallelism does not
solve the problem and has high back pressure.
1. The back pressure is high between the Kafka source and the coFlatMap
function.
Is this expected?  Could some one help with the reasoning behind why this
new pipeline is much more resource intensive than the original pipeline?
2. Are there any caveats to keep in mind when connecting a bounded stream
with an unbounded stream?
3. Also, is this the recommended design pattern for enriching a stream with
a static data set? Is there a better way to solve the problem?

Also we are using Flink-1.6.3 so we do not have the ability to use any API
advancements in the future versions of Flink.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoFlatMap has high back pressure

sundar
This post was updated on .
Hi Guaowei,
Here is what the code for my pipeline looks like.

Class CoFlatMapFunc extends
CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput>
{
       ValueState<FileInput> cache;

       public void open(Configuration parameters){
             //initialize cache
       }

       //read element from file and update cache.
       public void flatMap1(FileInput fileInput, Collector<KafkaOutput>
collector){
                cache.update(fileInput);
       }
       //read element from kafka, look up cache and output tuple.
       public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput>
collector){
              return new KafkaOutput(kafkaInput,cache.value());
       }
}


// Old pipeline that works fine.
Class OldFlinkPipeline {
       
       public SingleOutputStreamOperator<KafkaOutput>
                                       
generateOutput(StreamExecutionEnvironment env){
             
             DataStream<KafkaInput> kafkaStream = env
                               .addSource(new KafkaSourceFunction());

            return kafkaStream
            .map(kafkaInput ->
                  new KafkaOutput(kafkaInput, null /*fileInput*/ );
                         
       }
       

}

//New pipeline that is consuming more than 4X the resources.  
Class NewFlinkPipeline {
       
       public SingleOutputStreamOperator<KafkaOutput>
                                       
generateOutput(StreamExecutionEnvironment env){
             
             KeyedStream<KafkaInput,ID> kafkaStream = env
                               .addSource(new KafkaSourceFunction())
                               .keyBy(kafkaInput -> kafkaInput.getId());

             KeyedStream<FileInput,ID> fileStream = env
                                .readTextFile("file.txt")
                                .keyBy(fileInput -> fileInput.getId());

            return fileStream
           .connect(kafkaStream)
           .coFlatMap(new CoFlatMapFunc())
                         
       }
       
}

Please do let me know if this is the recommended way to connect a bounded
stream with an unbounded stream, or if I am doing something obviously
expensive here.  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoFlatMap has high back pressure

Guowei Ma
Hi Sundar,
1. Could you check the GC status of the process? or you could increase the memory size of your TM. (I find that you use the value state and I assume that you use the MemoryStatebackend)
2. AFAIK there is no performance limitation in using the `connect` operator for mixing the bounded/unbounded stream.

Best,
Guowei


sundar <[hidden email]> 于2020年5月20日周三 上午9:54写道:
Hi Guaowei,
Here is what the code for my pipeline looks like.

Class CoFlatMapFunc extends
CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput>
{
       ValueState<FileInput> cache;

       public void open(Configuration parameters){
             //initialize cache
       }

       //read element from file and update cache.
       public void flatMap1(FileInput fileInput, Collector<KafkaOutput>
collector){
                cache.update(fileInput);
       }
       //read element from kafka, look up cache and output tuple.
       public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput>
collector){
              return new KafkaOutput(kafkaInput,cache.value());
       }
}


// Old pipeline that works fine.
Class OldFlinkPipeline {

       public SingleOutputStreamOperator<KafkaOutput>

generateOutput(StreamExecutionEnvironment env){

             DataStream<KafkaInput> kafkaStream = env
                               .addSource(new KafkaSourceFunction());

            return kafkaStream
            .map(kafkaInput -&gt;
                  new KafkaOutput(kafkaInput, null /*fileInput*/ );

       }


}

//New pipeline that is consuming more than 4X the resources. 
Class NewFlinkPipeline {

       public SingleOutputStreamOperator<KafkaOutput>

generateOutput(StreamExecutionEnvironment env){

             KeyedStream<KafkaInput,ID> kafkaStream = env
                               .addSource(new KafkaSourceFunction())
                               .keyBy(kafkaInput -&gt; kafkaInput.getId());

             KeyedStream<FileInput,ID> fileStream = env
                                .readTextFile(&quot;file.txt&quot;)
                                .keyBy(fileInput -&gt; fileInput.getId());

            return fileStream
           .connect(kafkaStream)
           .coFlatMap(new CoFlatMapFunc())

       }

}

Please do let me know if this is the recommended way to connect a bounded
stream with an unbounded stream, or if I am doing something obviously
expensive here. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoFlatMap has high back pressure

Arvid Heise-3
Hi Sundar,

in general, you wouldn't load the static data in the driver, but upon opening the map on the processing nodes. If your processing nodes could hold the data, it might be the easiest to switch to this pattern. You usually load it once per node across all subtasks by using some kind of static map. I can go into details if you are interested.

Now for the actual question.

1. As soon as you use Co* functions, your pipeline cannot be chained anymore and needs to go over the network. I suspect that this is where the lag comes from. You can try to tweak the network configurations if you use a high degree of parallelism [1].
2. Not that I know of.
3. See above.


On Wed, May 20, 2020 at 4:55 AM Guowei Ma <[hidden email]> wrote:
Hi Sundar,
1. Could you check the GC status of the process? or you could increase the memory size of your TM. (I find that you use the value state and I assume that you use the MemoryStatebackend)
2. AFAIK there is no performance limitation in using the `connect` operator for mixing the bounded/unbounded stream.

Best,
Guowei


sundar <[hidden email]> 于2020年5月20日周三 上午9:54写道:
Hi Guaowei,
Here is what the code for my pipeline looks like.

Class CoFlatMapFunc extends
CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput>
{
       ValueState<FileInput> cache;

       public void open(Configuration parameters){
             //initialize cache
       }

       //read element from file and update cache.
       public void flatMap1(FileInput fileInput, Collector<KafkaOutput>
collector){
                cache.update(fileInput);
       }
       //read element from kafka, look up cache and output tuple.
       public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput>
collector){
              return new KafkaOutput(kafkaInput,cache.value());
       }
}


// Old pipeline that works fine.
Class OldFlinkPipeline {

       public SingleOutputStreamOperator<KafkaOutput>

generateOutput(StreamExecutionEnvironment env){

             DataStream<KafkaInput> kafkaStream = env
                               .addSource(new KafkaSourceFunction());

            return kafkaStream
            .map(kafkaInput -&gt;
                  new KafkaOutput(kafkaInput, null /*fileInput*/ );

       }


}

//New pipeline that is consuming more than 4X the resources. 
Class NewFlinkPipeline {

       public SingleOutputStreamOperator<KafkaOutput>

generateOutput(StreamExecutionEnvironment env){

             KeyedStream<KafkaInput,ID> kafkaStream = env
                               .addSource(new KafkaSourceFunction())
                               .keyBy(kafkaInput -&gt; kafkaInput.getId());

             KeyedStream<FileInput,ID> fileStream = env
                                .readTextFile(&quot;file.txt&quot;)
                                .keyBy(fileInput -&gt; fileInput.getId());

            return fileStream
           .connect(kafkaStream)
           .coFlatMap(new CoFlatMapFunc())

       }

}

Please do let me know if this is the recommended way to connect a bounded
stream with an unbounded stream, or if I am doing something obviously
expensive here. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: CoFlatMap has high back pressure

sundar
Thanks a lot for all the help!
I was able to figure out the bug just now.
I had some extra code in the coFlatMap function(emitting stats) which was
inefficient and causing high GC usage. Fixing that fixed the issue.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/