Hash join exceeded exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Hash join exceeded exception

Flavio Pompermaier
Hi to all,
I have this strange exception in my program, do you know what could be the cause of it?

java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.
at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:720)
at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.callWithNextKey(NonReusingBuildSecondHashMatchIterator.java:102)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:494)
at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221)
at java.lang.Thread.run(Thread.java:745)

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Hash join exceeded exception

Stephan Ewen
Hi Flavio!

The cause is usually as the exception method says: Too many duplicate keys. The side that builds the hash table has one key occurring so often that not all records with that key fit into memory together, even after multiple out-of-core recursions.

Here is a list of things to check:

 - Is the key you join on actually a good join key? If there is basically only one value for the join key (or one value occurs so extremely often), then this join is going to be really inefficient, there is little one can do about that. In that case, try to re-examine the program and try to reformulate it.

 - If you really need to do this join and are willing to live with the inefficiency, make the side with the duplicate key the "probe side" of the hash-join. You can use the JoinHint class to control that. Have a look at the JavaDocs of that class. The sort-merge join also falls back to a block-nested-loop-join for such cases (so it should always work), but it may cost.

 - If you are using a custom type, make sure that the hashCode() function is a good hash function. Otherwise, it may be that there are not too many duplicate keys, but too many duplicate hash codes, which has the same effect.



BTW: That may be a good question for the Flink Stackoverflow page - then people can find the answer easily by googeling.

Stephan



On Fri, Apr 17, 2015 at 2:03 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I have this strange exception in my program, do you know what could be the cause of it?

java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.
at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:720)
at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.callWithNextKey(NonReusingBuildSecondHashMatchIterator.java:102)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:494)
at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221)
at java.lang.Thread.run(Thread.java:745)

Best,
Flavio