Weird performance on custom Hashjoin w.r.t. parallelism

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Weird performance on custom Hashjoin w.r.t. parallelism

m@xi
Hello everyone!

I have implemented a custom parallel hashjoin algorithm (without windows
feature) in order to calculate the join of two input streams on a common
attribute using the CoFlatMap function and the state. After the join
operator (which has parallelism p = #processors) operator I have a map
operation (with parallelism 1) where I am using the Meter component to
measure the average throughput of the join operation. Finally, I am using a
DiscardingSink() as I only care about the throughput and the final count of
the join's result. I maintain 2 values of the throughput, the MAX avg value
I ever seen and the AVG avg value I have seen.

I am running on a server with 48 processors and I expect throughput to get
higher when the parallelism p becomes > 1. The same input stream is used in
all cases.

Although, as you can see in the excel file I attache not only the throughput
does not increase with the increase of p but also the time for the flink job
to execute increases as well.

I have also read this:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-td13170.html
where Kostas Kloudas implied that the Flink is not optimized for
multiprocessor execution.

I am wondering if this issue has to do with 1) the way that I am measuring
throughput, 2) the Flink system's internals that are not optimized for
multiprocessor architecture.

Any ideas or comments are welcome.

Thanks in advance.

Best,
Max

experiments8_11_17.xlsx
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1161/experiments8_11_17.xlsx>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Weird performance on custom Hashjoin w.r.t. parallelism

m@xi
Hello!

I found out that the cause of the problem was the map that I have after the
parallel join with parallelism 1.
When I changed it to .map(new MyMapMeter).setParallelism(p) then when I
increase the number of parallelism p the completion time decreases, which is
reasonable. Somehow it was a bottleneck of my parallel execution plan, but I
had it this way in order to measure a valid average throughput.

So, my question is the following:

How can I measure the average throughput of my parallel join operation
properly?

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Weird performance on custom Hashjoin w.r.t. parallelism

Piotr Nowojski
Hi,

Yes as you correctly analysed parallelism 1 was causing problems, because it meant that all of the records must been gathered over the network from all of the task managers. Keep in mind that even if you increase parallelism to ā€œpā€, every change in parallelism can slow down your application, because events will have to be redistributed, which in most cases means network transfers. 

For measuring throughput you could use already defined metrics in Flink:

You can get list of vertices of your job:

For example

You can also try to aggregate them:

Piotrek

On 9 Nov 2017, at 07:53, m@xi <[hidden email]> wrote:

Hello!

I found out that the cause of the problem was the map that I have after the
parallel join with parallelism 1.
When I changed it to .map(new MyMapMeter).setParallelism(p) then when I
increase the number of parallelism p the completion time decreases, which is
reasonable. Somehow it was a bottleneck of my parallel execution plan, but I
had it this way in order to measure a valid average throughput.

So, my question is the following:

How can I measure the average throughput of my parallel join operation
properly?

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/