Speeding up CoGroup in batch job

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Speeding up CoGroup in batch job

Ken Krugler
Hi all,

I added a CoGroup to my batch job, and it’s now running much slower, primarily due to back pressure from the CoGroup operator.

I assume it’s because this operator is having to sort/buffer-to-disk all incoming data. Looks like about 1TB from one side of the join, currently very little from the other but will be up to 2TB in the future.

I don’t see lots of GC, I’m using about 60% of available network buffers, per TM server load (for all 8 servers) is about 40% average, and both SSDs on each TM are being used for …/flink-io-xxx/yyy.channel files.

What are techniques for improving the performance of a CoGroup? 

Thanks!

— Ken

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply | Threaded
Open this post in threaded view
|

Re: Speeding up CoGroup in batch job

rmetzger0
Hi Ken,

Some random ideas that pop up in my head:
- make sure you use data types that are efficient to serialize, and cheap to compare (ideally use primitive types in TupleN or POJOs)
- Maybe try the TableAPI batch support (if you have time to experiment).
- optimize memory usage on the TaskManager for a lot of managed memory on the TaskManager, so that we have more memory for efficient sorting (leading to less spilling): https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#configure-memory-for-batch-jobs
- make sure to configure a separate tmp directory for each SSD, so that we can spread the load across all SSDs.
- If you are saying the CPU load is 40% on a TM, we have to assume we are IO bound: Is it the network or the disk(s)?

I hope this is some helpful inspiration for improving the performance.


On Fri, Sep 4, 2020 at 9:43 PM Ken Krugler <[hidden email]> wrote:
Hi all,

I added a CoGroup to my batch job, and it’s now running much slower, primarily due to back pressure from the CoGroup operator.

I assume it’s because this operator is having to sort/buffer-to-disk all incoming data. Looks like about 1TB from one side of the join, currently very little from the other but will be up to 2TB in the future.

I don’t see lots of GC, I’m using about 60% of available network buffers, per TM server load (for all 8 servers) is about 40% average, and both SSDs on each TM are being used for …/flink-io-xxx/yyy.channel files.

What are techniques for improving the performance of a CoGroup? 

Thanks!

— Ken

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply | Threaded
Open this post in threaded view
|

Re: Speeding up CoGroup in batch job

Ken Krugler
Hi Robert,

Thanks for the input. I did increase the amount of managed memory, and confirmed that both SSDs (on each slave) are being used for temp data.

I haven’t been able to figure out why the server CPU usage is low, but I did notice that it fluctuated from very low (10%) on up to 95+%, with the average around 50%. But iowait never gets very high. Wondering if CPU is low when a lot of segments are being flushed to disk, and high when a lot of segments are being sorted before being flushed.

The main bottleneck is the CoGroup operation, which is in the phase where it's writing a all of the (grouped) data to disk, in preparation for the sorted merge to do the grouping.

Looking at threads from a single dump of a TM process, most are WAITING, with counts like:

47 - requestMemorySegmentBlocking
70 - ReaderIterator.next
70 - AbstractRecordReader.getNextRecord

The only RUNNABLE threads that were doing anything interesting were all Kryo-related, which speaks to your point about ensuring I’m using POJOs.

I’m curious, after looking into the code, whether enabling object reuse would also help - I see different versions of mergers being used, depending on whether that’s on or not.

Thanks again,

— Ken


On Sep 11, 2020, at 5:27 AM, Robert Metzger <[hidden email]> wrote:

Hi Ken,

Some random ideas that pop up in my head:
- make sure you use data types that are efficient to serialize, and cheap to compare (ideally use primitive types in TupleN or POJOs)
- Maybe try the TableAPI batch support (if you have time to experiment).
- optimize memory usage on the TaskManager for a lot of managed memory on the TaskManager, so that we have more memory for efficient sorting (leading to less spilling): https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#configure-memory-for-batch-jobs
- make sure to configure a separate tmp directory for each SSD, so that we can spread the load across all SSDs.
- If you are saying the CPU load is 40% on a TM, we have to assume we are IO bound: Is it the network or the disk(s)?

I hope this is some helpful inspiration for improving the performance.


On Fri, Sep 4, 2020 at 9:43 PM Ken Krugler <[hidden email]> wrote:
Hi all,

I added a CoGroup to my batch job, and it’s now running much slower, primarily due to back pressure from the CoGroup operator.

I assume it’s because this operator is having to sort/buffer-to-disk all incoming data. Looks like about 1TB from one side of the join, currently very little from the other but will be up to 2TB in the future.

I don’t see lots of GC, I’m using about 60% of available network buffers, per TM server load (for all 8 servers) is about 40% average, and both SSDs on each TM are being used for …/flink-io-xxx/yyy.channel files.

What are techniques for improving the performance of a CoGroup? 

Thanks!

— Ken

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr