Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Miguel Coimbra
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Reply | Threaded
Open this post in threaded view
|

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Fabian Hueske-2
Hi Miguel,

the exception does indeed indicate that the process ran out of available disk space.
The quoted paragraph of the blog post describes the situation when you receive the IOE.

By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup.
You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1].

Hope this helps,

2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


Reply | Threaded
Open this post in threaded view
|

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Miguel Coimbra
In reply to this post by Miguel Coimbra
Hello Fabian,

I have created a directory on my host machine user directory ( /home/myuser/mydir ) and I am mapping it as a volume with Docker for the TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Here is an ls excerpt of the directory on the host (to which the TaskManager container was also writing successfully) shortly before the exception:

31G 9d177a1971322263f1597c3378885ccf.channel
31G a693811249bc5f785a79d1b1b537fe93.channel


Now I believe the host system is capable of storing hundred GBs more, so I am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Hi Miguel,

the exception does indeed indicate that the process ran out of available disk space.
The quoted paragraph of the blog post describes the situation when you receive the IOE.

By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup.
You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1].

Hope this helps,

2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Reply | Threaded
Open this post in threaded view
|

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Fabian Hueske-2
Hi Miguel,

have you found a solution to your problem?
I'm not a docker expert but this forum thread looks like could be related to your problem [1].

Best,
Fabian

2016-12-02 17:43 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

I have created a directory on my host machine user directory ( /home/myuser/mydir ) and I am mapping it as a volume with Docker for the TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Here is an ls excerpt of the directory on the host (to which the TaskManager container was also writing successfully) shortly before the exception:

31G 9d177a1971322263f1597c3378885ccf.channel
31G a693811249bc5f785a79d1b1b537fe93.channel


Now I believe the host system is capable of storing hundred GBs more, so I am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Hi Miguel,

the exception does indeed indicate that the process ran out of available disk space.
The quoted paragraph of the blog post describes the situation when you receive the IOE.

By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup.
You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1].

Hope this helps,

2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


Reply | Threaded
Open this post in threaded view
|

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Miguel Coimbra
In reply to this post by Miguel Coimbra
Hello Fabian,

Thanks for the attention. Still haven't solved this.
I did set up a cron job to clean the Docker images daily - thanks for that hint.
As a last resort, I am going to look into a 2 TB NAS to see if this works.

What is confusing me is that this happens also for the com-orkut.ungraph.txt dataset which is much smaller than com-friendster.ungraph.txt but not that much bigger than the com-dblp.ungraph.txt.

DBLP - ​I am able to run the DBLP on one TaskManager container.​
https://snap.stanford.edu/data/com-DBLP.html
Nodes 317080  ~0.3 M
Edges 1049866 ~ 1 M

Orkut - no disk space error.
https://snap.stanford.edu/data/com-Orkut.html
Nodes 3072441 ~3 M
Edges 117185083 ~ 117 M

​Friendster - no disk space error.
https://snap.stanford.edu/data/com-Friendster.html
Nodes 65608366 ~65 M
Edges 1806067135 ~ 1800 M​

For testing purposes, I'm using a JobManager (in its own Docker container), a single TaskManager (in its own Docker container) with the following config parameters:

Heap is currently configured to 6 GB:
taskmanager.heap.mb: 6000

Parallelism is set as such:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 1


It is my understanding that if I want to test for example N = 3 TaskManagers (each in its own Docker container) with minimum parallelism within each, I would use:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 3



Fabian, do you think you could help estimate how much disk space would be required to compute the Orkut data set for example?
I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
This is the code I am using to read SNAP datasets and to test with Orkut, Friendster and DBLP, in case you have a minute to inspect it and see if something is amiss:

public class App {
    public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "#"
            .types(Long.class, Long.class); 
      
        // Dealing with an undirected graph, so we call .getUndirected() at the end.
        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID = 8713516577419451509L;
                private long test = 1L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        ).getUndirected();

       
        try {
            // Generate a unique ID value for each vertex.
            // Based on https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
            DataSet<Tuple2<Long, Long>> idsWithInitialLabels = DataSetUtils.zipWithUniqueId(graph.getVertexIds())
                .map(
                    new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                        private static final long serialVersionUID = -6348050104902440929L;
       
                        @Override
                        public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception {
                            return new Tuple2<Long, Long>(tuple2.f1, tuple2.f0);
                        }
                    }
                );
           
            // Build the graph with initialization values.
            final Graph<Long, Long, NullValue> graphWithIDs = graph
                .joinWithVertices(idsWithInitialLabels,
                    new VertexJoinFunction<Long, Long>() {
                        private static final long serialVersionUID = -315275119763760820L;
                        public Long vertexJoin(Long vertexValue, Long inputValue) {
                            return inputValue;
                        }
                });
           
            // Execute LabelPropagation over it.
            DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new LabelPropagation<Long, Long, NullValue>(10));
           
            graph.getVertices().print();
           
            TimeUnit.SECONDS.sleep(2);
           
            System.out.println("graphWithIDs");
            graphWithIDs.getVertices().print();
            graphWithIDs.getEdges().print();
           
            TimeUnit.SECONDS.sleep(2);   
           
            // Group vertices by similar communities.
            final List<Vertex<Long, Long>> collected = result.collect();
            final HashMap<Long, ArrayList<Long>> commSizes = new HashMap<Long, ArrayList<Long>>();
            for(Vertex<Long, Long> v : collected) {
                //System.out.println("collected[v] = id:" + v.getId() + "\tval:" + v.getValue());
                if(!commSizes.containsKey(v.getValue())) {
                    commSizes.put(v.getValue(), new ArrayList<Long>());
                }
                commSizes.get(v.getValue()).add(v.getId());
            }

           
            System.out.println("#communities:\t" + commSizes.keySet().size() + "\n|result|:\t" + result.count() + "\n|collected|:\t" + collected.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


​Thanks for your time,​


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


---------- Forwarded message ----------
From: Fabian Hueske <
​​
[hidden email]>
To: [hidden email]
Cc: 
Date: Mon, 5 Dec 2016 08:40:04 +0100
Subject: 
​​
Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
Hi Miguel,

have you found a solution to your problem?
I'm not a docker expert but this forum thread looks like could be related to your problem [1].

Best,
Fabian

2016-12-02 17:43 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

I have created a directory on my host machine user directory ( /home/myuser/mydir ) and I am mapping it as a volume with Docker for the TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Here is an ls excerpt of the directory on the host (to which the TaskManager container was also writing successfully) shortly before the exception:

31G 9d177a1971322263f1597c3378885ccf.channel
31G a693811249bc5f785a79d1b1b537fe93.channel


Now I believe the host system is capable of storing hundred GBs more, so I am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Hi Miguel,

the exception does indeed indicate that the process ran out of available disk space.
The quoted paragraph of the blog post describes the situation when you receive the IOE.

By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup.
You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1].

Hope this helps,

2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra




Reply | Threaded
Open this post in threaded view
|

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Fabian Hueske-2
Hi Miguel,

estimating the space requirements is not trivial. It depends of course on the algorithm and the data itself. I'm not an expert for graph algorithms and don't know your datasets.

But have you tried to run the algorithm in a non dockerized environment?
That might help to figure out if this is an issue with the Docker configuration rather than Flink.

Btw. If you want to run with a parallelism of 3 you need at least three slots, either 3 three slots in one TM or 1 slot in each of three TMs.

Best,
Fabian

2016-12-05 17:20 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the attention. Still haven't solved this.
I did set up a cron job to clean the Docker images daily - thanks for that hint.
As a last resort, I am going to look into a 2 TB NAS to see if this works.

What is confusing me is that this happens also for the com-orkut.ungraph.txt dataset which is much smaller than com-friendster.ungraph.txt but not that much bigger than the com-dblp.ungraph.txt.

DBLP - ​I am able to run the DBLP on one TaskManager container.​
https://snap.stanford.edu/data/com-DBLP.html
Nodes 317080  ~0.3 M
Edges 1049866 ~ 1 M

Orkut - no disk space error.
https://snap.stanford.edu/data/com-Orkut.html
Nodes 3072441 ~3 M
Edges 117185083 ~ 117 M

​Friendster - no disk space error.
https://snap.stanford.edu/data/com-Friendster.html
Nodes 65608366 ~65 M
Edges 1806067135 ~ 1800 M​

For testing purposes, I'm using a JobManager (in its own Docker container), a single TaskManager (in its own Docker container) with the following config parameters:

Heap is currently configured to 6 GB:
taskmanager.heap.mb: 6000

Parallelism is set as such:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 1


It is my understanding that if I want to test for example N = 3 TaskManagers (each in its own Docker container) with minimum parallelism within each, I would use:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 3



Fabian, do you think you could help estimate how much disk space would be required to compute the Orkut data set for example?
I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
This is the code I am using to read SNAP datasets and to test with Orkut, Friendster and DBLP, in case you have a minute to inspect it and see if something is amiss:

public class App {
    public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "#"
            .types(Long.class, Long.class); 
      
        // Dealing with an undirected graph, so we call .getUndirected() at the end.
        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID = 8713516577419451509L;
                private long test = 1L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        ).getUndirected();

       
        try {
            // Generate a unique ID value for each vertex.
            // Based on https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
            DataSet<Tuple2<Long, Long>> idsWithInitialLabels = DataSetUtils.zipWithUniqueId(graph.getVertexIds())
                .map(
                    new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                        private static final long serialVersionUID = -6348050104902440929L;
       
                        @Override
                        public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception {
                            return new Tuple2<Long, Long>(tuple2.f1, tuple2.f0);
                        }
                    }
                );
           
            // Build the graph with initialization values.
            final Graph<Long, Long, NullValue> graphWithIDs = graph
                .joinWithVertices(idsWithInitialLabels,
                    new VertexJoinFunction<Long, Long>() {
                        private static final long serialVersionUID = -315275119763760820L;
                        public Long vertexJoin(Long vertexValue, Long inputValue) {
                            return inputValue;
                        }
                });
           
            // Execute LabelPropagation over it.
            DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new LabelPropagation<Long, Long, NullValue>(10));
           
            graph.getVertices().print();
           
            TimeUnit.SECONDS.sleep(2);
           
            System.out.println("graphWithIDs");
            graphWithIDs.getVertices().print();
            graphWithIDs.getEdges().print();
           
            TimeUnit.SECONDS.sleep(2);   
           
            // Group vertices by similar communities.
            final List<Vertex<Long, Long>> collected = result.collect();
            final HashMap<Long, ArrayList<Long>> commSizes = new HashMap<Long, ArrayList<Long>>();
            for(Vertex<Long, Long> v : collected) {
                //System.out.println("collected[v] = id:" + v.getId() + "\tval:" + v.getValue());
                if(!commSizes.containsKey(v.getValue())) {
                    commSizes.put(v.getValue(), new ArrayList<Long>());
                }
                commSizes.get(v.getValue()).add(v.getId());
            }

           
            System.out.println("#communities:\t" + commSizes.keySet().size() + "\n|result|:\t" + result.count() + "\n|collected|:\t" + collected.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


​Thanks for your time,​


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


---------- Forwarded message ----------
From: Fabian Hueske <
​​
[hidden email]>
To: [hidden email]
Cc: 
Date: Mon, 5 Dec 2016 08:40:04 +0100
Subject: 
​​
Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Hi Miguel,

have you found a solution to your problem?
I'm not a docker expert but this forum thread looks like could be related to your problem [1].

Best,
Fabian

2016-12-02 17:43 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

I have created a directory on my host machine user directory ( /home/myuser/mydir ) and I am mapping it as a volume with Docker for the TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Here is an ls excerpt of the directory on the host (to which the TaskManager container was also writing successfully) shortly before the exception:

31G 9d177a1971322263f1597c3378885ccf.channel
31G a693811249bc5f785a79d1b1b537fe93.channel


Now I believe the host system is capable of storing hundred GBs more, so I am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Hi Miguel,

the exception does indeed indicate that the process ran out of available disk space.
The quoted paragraph of the blog post describes the situation when you receive the IOE.

By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup.
You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1].

Hope this helps,

2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra





Reply | Threaded
Open this post in threaded view
|

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Miguel Coimbra
Hello Fabian,

So if I want to have 10 nodes with one working thread each, I would just set this, I assume:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 10

There is progress, albeit little.
I am now running on a directory with more space.
For 10 iterations of label propagation, I am getting this error at the end (on the TaskManager).
I thought the execution was taking too much time, so I checked CPU usage of the TaskManager and it was really low.
Checking the log on the TaskManager, I found this error at the bottom in bold:


2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (1/1)
2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (collect()) (1/1) switched to FINISHED
2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (collect()) (1/1)
2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (89eb2508cbda679502c2e0b258068274)
2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task DataSink (collect()) (26b8f3950f4e736b0798d28c4bf967ed)
2016-12-09 09:46:04,080 ERROR akka.remote.EndpointWriter                                    - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 79885809 bytes.


Do you have any idea what this might be?

Kind regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2016 at 19:57, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

estimating the space requirements is not trivial. It depends of course on the algorithm and the data itself. I'm not an expert for graph algorithms and don't know your datasets.

But have you tried to run the algorithm in a non dockerized environment?
That might help to figure out if this is an issue with the Docker configuration rather than Flink.

Btw. If you want to run with a parallelism of 3 you need at least three slots, either 3 three slots in one TM or 1 slot in each of three TMs.

Best,
Fabian

2016-12-05 17:20 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the attention. Still haven't solved this.
I did set up a cron job to clean the Docker images daily - thanks for that hint.
As a last resort, I am going to look into a 2 TB NAS to see if this works.

What is confusing me is that this happens also for the com-orkut.ungraph.txt dataset which is much smaller than com-friendster.ungraph.txt but not that much bigger than the com-dblp.ungraph.txt.

DBLP - ​I am able to run the DBLP on one TaskManager container.​
https://snap.stanford.edu/data/com-DBLP.html
Nodes 317080  ~0.3 M
Edges 1049866 ~ 1 M

Orkut - no disk space error.
https://snap.stanford.edu/data/com-Orkut.html
Nodes 3072441 ~3 M
Edges 117185083 ~ 117 M

​Friendster - no disk space error.
https://snap.stanford.edu/data/com-Friendster.html
Nodes 65608366 ~65 M
Edges 1806067135 ~ 1800 M​

For testing purposes, I'm using a JobManager (in its own Docker container), a single TaskManager (in its own Docker container) with the following config parameters:

Heap is currently configured to 6 GB:
taskmanager.heap.mb: 6000

Parallelism is set as such:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 1


It is my understanding that if I want to test for example N = 3 TaskManagers (each in its own Docker container) with minimum parallelism within each, I would use:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 3



Fabian, do you think you could help estimate how much disk space would be required to compute the Orkut data set for example?
I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
This is the code I am using to read SNAP datasets and to test with Orkut, Friendster and DBLP, in case you have a minute to inspect it and see if something is amiss:

public class App {
    public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "#"
            .types(Long.class, Long.class); 
      
        // Dealing with an undirected graph, so we call .getUndirected() at the end.
        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID = 8713516577419451509L;
                private long test = 1L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        ).getUndirected();

       
        try {
            // Generate a unique ID value for each vertex.
            // Based on https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
            DataSet<Tuple2<Long, Long>> idsWithInitialLabels = DataSetUtils.zipWithUniqueId(graph.getVertexIds())
                .map(
                    new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                        private static final long serialVersionUID = -6348050104902440929L;
       
                        @Override
                        public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception {
                            return new Tuple2<Long, Long>(tuple2.f1, tuple2.f0);
                        }
                    }
                );
           
            // Build the graph with initialization values.
            final Graph<Long, Long, NullValue> graphWithIDs = graph
                .joinWithVertices(idsWithInitialLabels,
                    new VertexJoinFunction<Long, Long>() {
                        private static final long serialVersionUID = -315275119763760820L;
                        public Long vertexJoin(Long vertexValue, Long inputValue) {
                            return inputValue;
                        }
                });
           
            // Execute LabelPropagation over it.
            DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new LabelPropagation<Long, Long, NullValue>(10));
           
            graph.getVertices().print();
           
            TimeUnit.SECONDS.sleep(2);
           
            System.out.println("graphWithIDs");
            graphWithIDs.getVertices().print();
            graphWithIDs.getEdges().print();
           
            TimeUnit.SECONDS.sleep(2);   
           
            // Group vertices by similar communities.
            final List<Vertex<Long, Long>> collected = result.collect();
            final HashMap<Long, ArrayList<Long>> commSizes = new HashMap<Long, ArrayList<Long>>();
            for(Vertex<Long, Long> v : collected) {
                //System.out.println("collected[v] = id:" + v.getId() + "\tval:" + v.getValue());
                if(!commSizes.containsKey(v.getValue())) {
                    commSizes.put(v.getValue(), new ArrayList<Long>());
                }
                commSizes.get(v.getValue()).add(v.getId());
            }

           
            System.out.println("#communities:\t" + commSizes.keySet().size() + "\n|result|:\t" + result.count() + "\n|collected|:\t" + collected.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


​Thanks for your time,​


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


---------- Forwarded message ----------
From: Fabian Hueske <
​​
[hidden email]>
To: [hidden email]
Cc: 
Date: Mon, 5 Dec 2016 08:40:04 +0100
Subject: 
​​
Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Hi Miguel,

have you found a solution to your problem?
I'm not a docker expert but this forum thread looks like could be related to your problem [1].

Best,
Fabian

2016-12-02 17:43 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

I have created a directory on my host machine user directory ( /home/myuser/mydir ) and I am mapping it as a volume with Docker for the TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Here is an ls excerpt of the directory on the host (to which the TaskManager container was also writing successfully) shortly before the exception:

31G 9d177a1971322263f1597c3378885ccf.channel
31G a693811249bc5f785a79d1b1b537fe93.channel


Now I believe the host system is capable of storing hundred GBs more, so I am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Hi Miguel,

the exception does indeed indicate that the process ran out of available disk space.
The quoted paragraph of the blog post describes the situation when you receive the IOE.

By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup.
You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1].

Hope this helps,

2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra






Reply | Threaded
Open this post in threaded view
|

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Fabian Hueske-2
It looks like the result you are trying to fetch with collect() is too large.
collect() does only work for result up to 10MB.

I would write the result to a file and read that file in.

Best, Fabian

2016-12-09 16:30 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

So if I want to have 10 nodes with one working thread each, I would just set this, I assume:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 10

There is progress, albeit little.
I am now running on a directory with more space.
For 10 iterations of label propagation, I am getting this error at the end (on the TaskManager).
I thought the execution was taking too much time, so I checked CPU usage of the TaskManager and it was really low.
Checking the log on the TaskManager, I found this error at the bottom in bold:


2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (1/1)
2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (collect()) (1/1) switched to FINISHED
2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (collect()) (1/1)
2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (89eb2508cbda679502c2e0b258068274)
2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task DataSink (collect()) (26b8f3950f4e736b0798d28c4bf967ed)
2016-12-09 09:46:04,080 ERROR akka.remote.EndpointWriter                                    - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 79885809 bytes.


Do you have any idea what this might be?

Kind regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2016 at 19:57, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

estimating the space requirements is not trivial. It depends of course on the algorithm and the data itself. I'm not an expert for graph algorithms and don't know your datasets.

But have you tried to run the algorithm in a non dockerized environment?
That might help to figure out if this is an issue with the Docker configuration rather than Flink.

Btw. If you want to run with a parallelism of 3 you need at least three slots, either 3 three slots in one TM or 1 slot in each of three TMs.

Best,
Fabian

2016-12-05 17:20 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the attention. Still haven't solved this.
I did set up a cron job to clean the Docker images daily - thanks for that hint.
As a last resort, I am going to look into a 2 TB NAS to see if this works.

What is confusing me is that this happens also for the com-orkut.ungraph.txt dataset which is much smaller than com-friendster.ungraph.txt but not that much bigger than the com-dblp.ungraph.txt.

DBLP - ​I am able to run the DBLP on one TaskManager container.​
https://snap.stanford.edu/data/com-DBLP.html
Nodes 317080  ~0.3 M
Edges 1049866 ~ 1 M

Orkut - no disk space error.
https://snap.stanford.edu/data/com-Orkut.html
Nodes 3072441 ~3 M
Edges 117185083 ~ 117 M

​Friendster - no disk space error.
https://snap.stanford.edu/data/com-Friendster.html
Nodes 65608366 ~65 M
Edges 1806067135 ~ 1800 M​

For testing purposes, I'm using a JobManager (in its own Docker container), a single TaskManager (in its own Docker container) with the following config parameters:

Heap is currently configured to 6 GB:
taskmanager.heap.mb: 6000

Parallelism is set as such:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 1


It is my understanding that if I want to test for example N = 3 TaskManagers (each in its own Docker container) with minimum parallelism within each, I would use:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 3



Fabian, do you think you could help estimate how much disk space would be required to compute the Orkut data set for example?
I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
This is the code I am using to read SNAP datasets and to test with Orkut, Friendster and DBLP, in case you have a minute to inspect it and see if something is amiss:

public class App {
    public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "#"
            .types(Long.class, Long.class); 
      
        // Dealing with an undirected graph, so we call .getUndirected() at the end.
        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID = 8713516577419451509L;
                private long test = 1L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        ).getUndirected();

       
        try {
            // Generate a unique ID value for each vertex.
            // Based on https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
            DataSet<Tuple2<Long, Long>> idsWithInitialLabels = DataSetUtils.zipWithUniqueId(graph.getVertexIds())
                .map(
                    new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                        private static final long serialVersionUID = -6348050104902440929L;
       
                        @Override
                        public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception {
                            return new Tuple2<Long, Long>(tuple2.f1, tuple2.f0);
                        }
                    }
                );
           
            // Build the graph with initialization values.
            final Graph<Long, Long, NullValue> graphWithIDs = graph
                .joinWithVertices(idsWithInitialLabels,
                    new VertexJoinFunction<Long, Long>() {
                        private static final long serialVersionUID = -315275119763760820L;
                        public Long vertexJoin(Long vertexValue, Long inputValue) {
                            return inputValue;
                        }
                });
           
            // Execute LabelPropagation over it.
            DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new LabelPropagation<Long, Long, NullValue>(10));
           
            graph.getVertices().print();
           
            TimeUnit.SECONDS.sleep(2);
           
            System.out.println("graphWithIDs");
            graphWithIDs.getVertices().print();
            graphWithIDs.getEdges().print();
           
            TimeUnit.SECONDS.sleep(2);   
           
            // Group vertices by similar communities.
            final List<Vertex<Long, Long>> collected = result.collect();
            final HashMap<Long, ArrayList<Long>> commSizes = new HashMap<Long, ArrayList<Long>>();
            for(Vertex<Long, Long> v : collected) {
                //System.out.println("collected[v] = id:" + v.getId() + "\tval:" + v.getValue());
                if(!commSizes.containsKey(v.getValue())) {
                    commSizes.put(v.getValue(), new ArrayList<Long>());
                }
                commSizes.get(v.getValue()).add(v.getId());
            }

           
            System.out.println("#communities:\t" + commSizes.keySet().size() + "\n|result|:\t" + result.count() + "\n|collected|:\t" + collected.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


​Thanks for your time,​


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


---------- Forwarded message ----------
From: Fabian Hueske <
​​
[hidden email]>
To: [hidden email]
Cc: 
Date: Mon, 5 Dec 2016 08:40:04 +0100
Subject: 
​​
Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Hi Miguel,

have you found a solution to your problem?
I'm not a docker expert but this forum thread looks like could be related to your problem [1].

Best,
Fabian

2016-12-02 17:43 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

I have created a directory on my host machine user directory ( /home/myuser/mydir ) and I am mapping it as a volume with Docker for the TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

Here is an ls excerpt of the directory on the host (to which the TaskManager container was also writing successfully) shortly before the exception:

31G 9d177a1971322263f1597c3378885ccf.channel
31G a693811249bc5f785a79d1b1b537fe93.channel


Now I believe the host system is capable of storing hundred GBs more, so I am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Hi Miguel,

the exception does indeed indicate that the process ran out of available disk space.
The quoted paragraph of the blog post describes the situation when you receive the IOE.

By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup.
You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1].

Hope this helps,

2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of it.
As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device
    at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)



I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk...
Perhaps this is a Docker limitation regarding the usage of the host's secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which I am missing?
Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory?


I believe the SortMerger thread is associated to the following mechanism described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key.

I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra