Hello, the performance of apply function after join

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Hello, the performance of apply function after join

Philip Lee
Hello, the performance of apply function after join.


Just for your information, I am running Flink job on the cluster consisted of 9 machine with each 48 cores. I am working on some benchmark with comparison of Flink, Spark-Sql, and Hive.

I tried to optimize join function with Hint for better performance. I want to increase the performance as much as possible.

Here are Questions===
1) When seeing job progress log, apply() after join function seems like it takes a bit long time. Do you think if I do not use apply() to format tuples, I would gain the better performance? Well, I could set just the column number instead of apply()

2) on using join with Hint like Huge or Tiny, is there the ideal ratio regarding to the size of two tables? For me, if some table is 10 times bigger than the other table, I use join with Hint. Otherwise, I usually use the general join().

Best,
Phil






Reply | Threaded
Open this post in threaded view
|

Re: Hello, the performance of apply function after join

Fabian Hueske-2
Hi Phil,

an apply method after a join runs pipelined with the join, i.e., it starts processing when the first join result is emitted and finishes after it handled the last join result.
Unless the logic in your apply function is not terribly complex, this should be OK. If you do not specify an apply method, a default method will be used which returns a Tuple2(left, right).

Regarding the join hints, there is no general rule when to use joinWithHuge / joinWithTiny. It depends on the number of machines, machine specs, number of records, record size, etc...
If you use joinWithHuge/Tiny, the smaller side will be broadcasted to every node and each parallel partition will hold the full relation in memory, i.e., if the smaller side is 10GB, you need at least 10GB for each task manager slot. So this should only be used if the smaller side is *really* small.

The join method does also allow to specify more fine-grained hints such as:

small.join(large, JoinHint.REPARTITION_HASH_SECOND)

which will execute the join by shuffling both inputs and building a hash table on the partition of the second input.
If you want to optimize for performance, you should try both hints: REPARTITION_HASH_*small* and BROADCAST_HASH_*small*.

Best, Fabian

2015-12-01 21:34 GMT+01:00 Philip Lee <[hidden email]>:
Hello, the performance of apply function after join.


Just for your information, I am running Flink job on the cluster consisted of 9 machine with each 48 cores. I am working on some benchmark with comparison of Flink, Spark-Sql, and Hive.

I tried to optimize join function with Hint for better performance. I want to increase the performance as much as possible.

Here are Questions===
1) When seeing job progress log, apply() after join function seems like it takes a bit long time. Do you think if I do not use apply() to format tuples, I would gain the better performance? Well, I could set just the column number instead of apply()

2) on using join with Hint like Huge or Tiny, is there the ideal ratio regarding to the size of two tables? For me, if some table is 10 times bigger than the other table, I use join with Hint. Otherwise, I usually use the general join().

Best,
Phil