Delta iteration not spilling to disk

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Delta iteration not spilling to disk

Joshua Griffith
I’m currently using a delta iteration within a batch job and received the following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

It looks like the job ran out of Flink managed memory. Can delta iterations not spill to disk?

Thanks,

Joshua
Reply | Threaded
Open this post in threaded view
|

Re: Delta iteration not spilling to disk

Fabian Hueske-2
Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory vs. heap memory, increasing heap memory, ...) or add more resources and increase the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap on the heap by setting the solution set to unManaged (DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith <[hidden email]>:
I’m currently using a delta iteration within a batch job and received the following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

It looks like the job ran out of Flink managed memory. Can delta iterations not spill to disk?

Thanks,

Joshua

Reply | Threaded
Open this post in threaded view
|

Re: Delta iteration not spilling to disk

Joshua Griffith
Hello Fabian,

Thank you for your response. I tried setting the solution set to unmanaged and got a different error:

2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join (join solution trees) (1/8)
java.lang.NullPointerException: null
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:207)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I initially thought this was due to a null being present in the solution set tuple so I added assertions to ensure that tuple values were never null. However, I’m still getting the above error. Did changing it to unmanaged cause the tuples to be serialized? Is there another reason aside from null values that this error might be thrown?

Thank you,

Joshua

On Oct 25, 2017, at 3:12 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory vs. heap memory, increasing heap memory, ...) or add more resources and increase the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap on the heap by setting the solution set to unManaged (DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith <[hidden email]>:
I’m currently using a delta iteration within a batch job and received the following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

It looks like the job ran out of Flink managed memory. Can delta iterations not spill to disk?

Thanks,

Joshua


Reply | Threaded
Open this post in threaded view
|

Re: Delta iteration not spilling to disk

Fabian Hueske-2
Hi Joshua,

with the unmanaged solution set, the records are not serialized but they need to be copied to avoid them from being mutated by the user-code JoinFunction.
The stacktrace hints that the NPE is caused by copying a null record. This would happen if the solution set would not contain the key.

I was not sure if there is a restriction of the delta iteration that all keys must be present in the initial solution set. I tried to find this in the documentation but didn't see information on that.
So I checked and was able to reproduce the problem.
It is only possible to join the solution set with keys that are actually contained in the solution set.

It's a bit surprising that this limitation is not documented and no proper exception is thrown. In fact it would be possible to avoid the exception by either:
- not calling the join function (this would be inner join semantics) or
- calling the join function with a null value (similar to an outer join).

If created a JIRA issue [1] to track the problem.

Best, Fabian


2017-10-25 16:58 GMT+02:00 Joshua Griffith <[hidden email]>:
Hello Fabian,

Thank you for your response. I tried setting the solution set to unmanaged and got a different error:

2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join (join solution trees) (1/8)
java.lang.NullPointerException: null
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:207)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I initially thought this was due to a null being present in the solution set tuple so I added assertions to ensure that tuple values were never null. However, I’m still getting the above error. Did changing it to unmanaged cause the tuples to be serialized? Is there another reason aside from null values that this error might be thrown?

Thank you,

Joshua

On Oct 25, 2017, at 3:12 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory vs. heap memory, increasing heap memory, ...) or add more resources and increase the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap on the heap by setting the solution set to unManaged (DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith <[hidden email]>:
I’m currently using a delta iteration within a batch job and received the following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

It looks like the job ran out of Flink managed memory. Can delta iterations not spill to disk?

Thanks,

Joshua



Reply | Threaded
Open this post in threaded view
|

Re: Delta iteration not spilling to disk

Joshua Griffith
Hi Fabian,

Switching the solution set join to a co-group indeed fixed the issue. Thank you!

Joshua

On Oct 25, 2017, at 11:00 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

with the unmanaged solution set, the records are not serialized but they need to be copied to avoid them from being mutated by the user-code JoinFunction.
The stacktrace hints that the NPE is caused by copying a null record. This would happen if the solution set would not contain the key.

I was not sure if there is a restriction of the delta iteration that all keys must be present in the initial solution set. I tried to find this in the documentation but didn't see information on that.
So I checked and was able to reproduce the problem.
It is only possible to join the solution set with keys that are actually contained in the solution set.

It's a bit surprising that this limitation is not documented and no proper exception is thrown. In fact it would be possible to avoid the exception by either:
- not calling the join function (this would be inner join semantics) or
- calling the join function with a null value (similar to an outer join).

If created a JIRA issue [1] to track the problem.

Best, Fabian


2017-10-25 16:58 GMT+02:00 Joshua Griffith <[hidden email]>:
Hello Fabian,

Thank you for your response. I tried setting the solution set to unmanaged and got a different error:

2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join (join solution trees) (1/8)
java.lang.NullPointerException: null
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:207)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I initially thought this was due to a null being present in the solution set tuple so I added assertions to ensure that tuple values were never null. However, I’m still getting the above error. Did changing it to unmanaged cause the tuples to be serialized? Is there another reason aside from null values that this error might be thrown?

Thank you,

Joshua

On Oct 25, 2017, at 3:12 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory vs. heap memory, increasing heap memory, ...) or add more resources and increase the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap on the heap by setting the solution set to unManaged (DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith <[hidden email]>:
I’m currently using a delta iteration within a batch job and received the following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

It looks like the job ran out of Flink managed memory. Can delta iterations not spill to disk?

Thanks,

Joshua




Reply | Threaded
Open this post in threaded view
|

Re: Delta iteration not spilling to disk

santoshg
Hi Joshua,

I am running into a similar problem. Can you explain your solution a bit
more ? A code snippet will help.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/