Hello, I am running the Flink standalone cluster with 2 Docker containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM. The dataset is a large one: SNAP Friendster, which has around 1800 M edges. https://snap.stanford.edu/data/com-Friendster.html I am trying to run the Gelly built-in label propagation algorithm on top of it. As this is a very big dataset, I believe I am exceeding the available RAM and that the system is using secondary storage, which then fails: Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#894624508] 12/01/2016 17:58:24 Job execution switched to status RUNNING. 12/01/2016 17:58:24 DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED 12/01/2016 17:58:24 DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING 12/01/2016 17:58:24 DataSource (at main(App.java:33) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING 12/01/2016 17:58:24 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 12/01/2016 17:58:24 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 12/01/2016 17:58:24 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING 12/01/2016 17:59:51 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FAILED java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098) at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: java.io.IOException: No space left on device at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502) I do not have secondary storage limitations on the host system, so I believe the system would be able to handle whatever is spilled to the disk... Perhaps this is a Docker limitation regarding the usage of the host's secondary storage? Or is there perhaps some configuration or setting for the TaskManager which I am missing? Running the label propagation of Gelly on this dataset and cluster configuration, what would be the expected behavior if the system consumes all the memory? I believe the SortMerger thread is associated to the following mechanism described in this blog post: https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key. I am still investigating the possibility that Docker is at fault regarding secondary storage limitations, but how would I go about estimating the amount of disk space needed for this spilling on this dataset? Thanks for your time, My best regards, Miguel E. Coimbra Email: [hidden email] Skype: miguel.e.coimbra |
Hi Miguel, the exception does indeed indicate that the process ran out of available disk space.The quoted paragraph of the blog post describes the situation when you receive the IOE. By default the systems default tmp dir is used. I don't know which folder that would be in a Docker setup. You can configure the temp dir using the taskmanager.tmp.dirs config key.
Please see the configuration documentation for details [1]. Hope this helps,
2016-12-02 0:18 GMT+01:00 Miguel Coimbra <[hidden email]>:
|
In reply to this post by Miguel Coimbra
Hello Fabian, I have created a directory on my host machine user directory ( /home/myuser/mydir ) and I am mapping it as a volume with Docker for the TaskManager and JobManager containers. Each container will thus have the following directory /home/flink/htmp host ---> container /home/myuser/mydir ---> /home/flink/htmp I had previously done this successfully with the a host directory which holds several SNAP data sets. In the Flink configuration file, I specified /home/flink/htmp to be used as the tmp dir for the TaskManager. This seems to be working, as I was able to start the cluster and invoke Flink for that Friendster dataset. However, during execution, there were 2 intermediate files which kept growing until they reached about 30 GB. At that point, the Flink TaskManager threw the exception again: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device Here is an ls excerpt of the directory on the host (to which the TaskManager container was also writing successfully) shortly before the exception: 31G 9d177a1971322263f1597c3378885ccf.channel 31G a693811249bc5f785a79d1b1b537fe93.channel Now I believe the host system is capable of storing hundred GBs more, so I am confused as to what the problem might be. Best regards, Miguel E. Coimbra Email: [hidden email] Skype: miguel.e.coimbra
|
Hi Miguel, have you found a solution to your problem?2016-12-02 17:43 GMT+01:00 Miguel Coimbra <[hidden email]>:
|
In reply to this post by Miguel Coimbra
Hello Fabian, Thanks for the attention. Still haven't solved this. I did set up a cron job to clean the Docker images daily - thanks for that hint. As a last resort, I am going to look into a 2 TB NAS to see if this works. What is confusing me is that this happens also for the com-orkut.ungraph.txt dataset which is much smaller than com-friendster.ungraph.txt but not that much bigger than the com-dblp.ungraph.txt. DBLP - I am able to run the DBLP on one TaskManager container. https://snap.stanford.edu/ Nodes 317080 ~0.3 M Edges 1049866 ~ 1 M Orkut - no disk space error. https://snap.stanford.edu/ Nodes 3072441 ~3 M Edges 117185083 ~ 117 M Friendster - no disk space error. https://snap.stanford.edu/ Nodes 65608366 ~65 M Edges 1806067135 ~ 1800 M For testing purposes, I'm using a JobManager (in its own Docker container), a single TaskManager (in its own Docker container) with the following config parameters: Heap is currently configured to 6 GB: taskmanager.heap.mb: 6000 Parallelism is set as such: taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 It is my understanding that if I want to test for example N = 3 TaskManagers (each in its own Docker container) with minimum parallelism within each, I would use: taskmanager.numberOfTaskSlots: 1 parallelism.default: 3 Fabian, do you think you could help estimate how much disk space would be required to compute the Orkut data set for example? I am running a Flink 1.1.3 Docker cluster with a single TaskManager. This is the code I am using to read SNAP datasets and to test with Orkut, Friendster and DBLP, in case you have a minute to inspect it and see if something is amiss: public class App { public static void main(String[] args) { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final String dataPath = args[0]; final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath) .fieldDelimiter("\t") // node IDs are separated by spaces .ignoreComments("#") // comments start with "#" .types(Long.class, Long.class); // Dealing with an undirected graph, so we call .getUndirected() at the end. final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet( edgeTuples, new MapFunction<Long, Long>() { private static final long serialVersionUID = 8713516577419451509L; private long test = 1L; public Long map(Long value) { return value; } }, env ).getUndirected(); try { // Generate a unique ID value for each vertex. // Based on https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java DataSet<Tuple2<Long, Long>> idsWithInitialLabels = DataSetUtils.zipWithUniqueId(graph.getVertexIds()) .map( new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { private static final long serialVersionUID = -6348050104902440929L; @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception { return new Tuple2<Long, Long>(tuple2.f1, tuple2.f0); } } ); // Build the graph with initialization values. final Graph<Long, Long, NullValue> graphWithIDs = graph .joinWithVertices(idsWithInitialLabels, new VertexJoinFunction<Long, Long>() { private static final long serialVersionUID = -315275119763760820L; public Long vertexJoin(Long vertexValue, Long inputValue) { return inputValue; } }); // Execute LabelPropagation over it. DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new LabelPropagation<Long, Long, NullValue>(10)); graph.getVertices().print(); TimeUnit.SECONDS.sleep(2); System.out.println("graphWithIDs"); graphWithIDs.getVertices().print(); graphWithIDs.getEdges().print(); TimeUnit.SECONDS.sleep(2); // Group vertices by similar communities. final List<Vertex<Long, Long>> collected = result.collect(); final HashMap<Long, ArrayList<Long>> commSizes = new HashMap<Long, ArrayList<Long>>(); for(Vertex<Long, Long> v : collected) { //System.out.println("collected[v] = id:" + v.getId() + "\tval:" + v.getValue()); if(!commSizes.containsKey(v.getValue())) { commSizes.put(v.getValue(), new ArrayList<Long>()); } commSizes.get(v.getValue()).add(v.getId()); } System.out.println("#communities:\t" + commSizes.keySet().size() + "\n|result|:\t" + result.count() + "\n|collected|:\t" + collected.size()); } catch (Exception e) { e.printStackTrace(); } } } Thanks for your time, Miguel E. Coimbra Email: [hidden email] Skype: miguel.e.coimbra ---------- Forwarded message ---------- From: Fabian Hueske < |
Hi Miguel, estimating the space requirements is not trivial. It depends of course on the algorithm and the data itself. I'm not an expert for graph algorithms and don't know your datasets.2016-12-05 17:20 GMT+01:00 Miguel Coimbra <[hidden email]>:
|
Hello Fabian, So if I want to have 10 nodes with one working thread each, I would just set this, I assume: taskmanager.numberOfTaskSlots: 1 parallelism.default: 10 There is progress, albeit little. I am now running on a directory with more space. For 10 iterations of label propagation, I am getting this error at the end (on the TaskManager). I thought the execution was taking too much time, so I checked CPU usage of the TaskManager and it was really low. Checking the log on the TaskManager, I found this error at the bottom in bold: 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (1/1) 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1) switched to FINISHED 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for DataSink (collect()) (1/1) 2016-12-09 09:46:00,317 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FINISHED to JobManager for task IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (89eb2508cbda679502c2e0b258068274) 2016-12-09 09:46:00,317 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FINISHED to JobManager for task DataSink (collect()) (26b8f3950f4e736b0798d28c4bf967ed) 2016-12-09 09:46:04,080 ERROR akka.remote.EndpointWriter - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 79885809 bytes. Do you have any idea what this might be? Kind regards, Miguel E. Coimbra Email: [hidden email] Skype: miguel.e.coimbra On 6 December 2016 at 19:57, Fabian Hueske <[hidden email]> wrote:
|
It looks like the result you are trying to fetch with collect() is too large. collect() does only work for result up to 10MB.2016-12-09 16:30 GMT+01:00 Miguel Coimbra <[hidden email]>:
|
Free forum by Nabble | Edit this page |