Hello everyone!
I have implemented a custom parallel hashjoin algorithm (without windows feature) in order to calculate the join of two input streams on a common attribute using the CoFlatMap function and the state. After the join operator (which has parallelism p = #processors) operator I have a map operation (with parallelism 1) where I am using the Meter component to measure the average throughput of the join operation. Finally, I am using a DiscardingSink() as I only care about the throughput and the final count of the join's result. I maintain 2 values of the throughput, the MAX avg value I ever seen and the AVG avg value I have seen. I am running on a server with 48 processors and I expect throughput to get higher when the parallelism p becomes > 1. The same input stream is used in all cases. Although, as you can see in the excel file I attache not only the throughput does not increase with the increase of p but also the time for the flink job to execute increases as well. I have also read this: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-td13170.html where Kostas Kloudas implied that the Flink is not optimized for multiprocessor execution. I am wondering if this issue has to do with 1) the way that I am measuring throughput, 2) the Flink system's internals that are not optimized for multiprocessor architecture. Any ideas or comments are welcome. Thanks in advance. Best, Max experiments8_11_17.xlsx <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1161/experiments8_11_17.xlsx> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hello!
I found out that the cause of the problem was the map that I have after the parallel join with parallelism 1. When I changed it to .map(new MyMapMeter).setParallelism(p) then when I increase the number of parallelism p the completion time decreases, which is reasonable. Somehow it was a bottleneck of my parallel execution plan, but I had it this way in order to measure a valid average throughput. So, my question is the following: How can I measure the average throughput of my parallel join operation properly? Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Yes as you correctly analysed parallelism 1 was causing problems, because it meant that all of the records must been gathered over the network from all of the task managers. Keep in mind that even if you increase parallelism to āpā, every change in parallelism can slow down your application, because events will have to be redistributed, which in most cases means network transfers. For measuring throughput you could use already defined metrics in Flink: You can get list of vertices of your job: Then statistics: For example You can also try to aggregate them: Piotrek
|
Free forum by Nabble | Edit this page |