Hash join failing

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Hash join failing

Sebastian Schelter-2
Hi,

What can I do to give Flink more memory when running it from my IDE? I'm
getting the following exception:

Caused by: java.lang.RuntimeException: Hash join exceeded maximum number
of recursions, without reducing partitions enough to be memory resident.
Probably cause: Too many duplicate keys.
        at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:720)
        at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
        at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
        at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
        at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
        at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:494)
        at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
        at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:223)
        at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: Hash join failing

Stephan Ewen
If you have this case, giving more memory is fighting a symptom, rather than a cause.

If you really have that many duplicates in the data set (and you have not just a bad implementation of "hashCode()"), then try the following:

1) Reverse hash join sides. Duplicates hurt only on the build-side, not on the probe side. This works if the other input has much fewer duplicate keys. You can do this with a JoinHint.

2) Switch to a sort-merge join. This will be slow with very many duplicate keys, but should not break.

Let me know how it works!

On Tue, May 26, 2015 at 10:22 PM, Sebastian <[hidden email]> wrote:
Hi,

What can I do to give Flink more memory when running it from my IDE? I'm getting the following exception:

Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.
        at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:720)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
        at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
        at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
        at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:494)
        at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:223)
        at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: Hash join failing

Sebastian Schelter-2
Switching the sides worked (I tried that shortly after sending the mail).

Thanks for the fast response :)

On 26.05.2015 22:26, Stephan Ewen wrote:

> If you have this case, giving more memory is fighting a symptom, rather
> than a cause.
>
> If you really have that many duplicates in the data set (and you have
> not just a bad implementation of "hashCode()"), then try the following:
>
> 1) Reverse hash join sides. Duplicates hurt only on the build-side, not
> on the probe side. This works if the other input has much fewer
> duplicate keys. You can do this with a JoinHint.
>
> 2) Switch to a sort-merge join. This will be slow with very many
> duplicate keys, but should not break.
>
> Let me know how it works!
>
> On Tue, May 26, 2015 at 10:22 PM, Sebastian <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>
>     What can I do to give Flink more memory when running it from my IDE?
>     I'm getting the following exception:
>
>     Caused by: java.lang.RuntimeException: Hash join exceeded maximum
>     number of recursions, without reducing partitions enough to be
>     memory resident. Probably cause: Too many duplicate keys.
>              at
>     org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:720)
>              at
>     org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>              at
>     org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>              at
>     org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>              at
>     org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>              at
>     org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:494)
>              at
>     org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>              at
>     org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:223)
>              at java.lang.Thread.run(Thread.java:745)
>
>