Dear community, I have a problem which I hope you'll be able to help with. I apologize in advance for the verbosity of the post. I am running the Flink standalone cluster (not even storing to the filesystem) with 2 Docker containers. I set the image of the Dockerfile for Flink 1.1.2, which was the same version of the main class in the .jar The Docker image was configured to use Java 8, which is what the project's pom.xml requires as well. I have also edited the TaskManager conf/flink-con.yaml to have the following values: .... taskmanager.heap.mb: 7512 .... taskmanager.network.numberOfBuffers: 16048 .... Properties of this host/docker setup: - host machine has 256 GB of RAM - job manager container is running with default flink config - task manager has 7.5 GB of memory available - task manager number of buffers is 16048 which is very generous compared to the default value I am testing on the SNAP DBLP dataset: https://snap.stanford.edu/data/com-DBLP.html It has: 317080 nodes 1049866 edges These are the relevant parts of the pom.xml of the project: (note: the project executes without error for local executions without the cluster) .... <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <flink.version>1.1.2</flink.version> </properties> ..... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-gelly_2.10</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> I am running (what I believe to be) a simple Gelly application, performing the ConnectedComponents algorithm with 30 iterations: public static void main(String[] args) { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final String dataPath = args[0]; final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath) .fieldDelimiter("\t") // node IDs are separated by spaces .ignoreComments("#") // comments start with "%" .types(Long.class, Long.class); try { System.out.println("Tuple size: " + edgeTuples.count()); } catch (Exception e1) { e1.printStackTrace(); } /* * @param <K> the key type for edge and vertex identifiers * @param <VV> the value type for vertices * @param <EV> the value type for edges * public class Graph<K, VV, EV> */ final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet( edgeTuples, new MapFunction<Long, Long>() { private static final long serialVersionUID = 8713516577419451509L; public Long map(Long value) { return value; } }, env ); try { /** * @param <K> key type * @param <VV> vertex value type * @param <EV> edge value type * @param <T> the return type class ConnectedComponents<K, VV extends Comparable<VV>, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, VV>>> */ DataSet<Vertex<Long, Long>> verticesWithComponents = graph.run(new ConnectedComponents<Long, Long, NullValue>(30)); System.out.println("Component count: " + verticesWithComponents.count()); } catch (Exception e) { e.printStackTrace(); } } However, the following is output on the host machine on execution: docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 3de7625b8e28:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt Cluster configuration: Standalone cluster with JobManager at /172.19.0.2:6123 Using address 172.19.0.2:6123 to connect to JobManager. JobManager web interface address http://172.19.0.2:8081 Starting execution of program Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-658812967] 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED 11/08/2016 21:22:44 Job execution switched to status FINISHED. Tuple size: 1049866 Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-658812967] 11/08/2016 21:22:45 Job execution switched to status RUNNING. 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING 11/08/2016 21:22:45 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED 11/08/2016 21:22:48 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to SCHEDULED 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to DEPLOYING 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to RUNNING 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to FAILED java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments. at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206) at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191) at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 11/08/2016 21:22:48 Job execution switched to status FAILING. java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments. at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206) at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191) at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) The results I found online so far were not enough, and I am not sure as to the best way to solve this. If anyone can help diagnose and correct this issue, I would be very thankful. Best regards, Miguel E. Coimbra Email: [hidden email] Skype: miguel.e.coimbra |
Hi Miguel, I tried to reproduce the problem in a similar setup using Flink in Docker with 2 workers: CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 3fbaf5876e31 melentye/flink:latest "docker-entrypoint.sh" 3 minutes ago Up 3 minutes 6121-6122/tcp flinkdocker_flink-worker_2 bd87bfa6c03d melentye/flink:latest "docker-entrypoint.sh" 4 minutes ago Up 3 minutes 6121-6122/tcp flinkdocker_flink-worker_1 16c7607b3ec2 melentye/flink:latest "docker-entrypoint.sh" 4 minutes ago Up 4 minutes 0.0.0.0:8081->8081/tcp, 6123/tcp, 0.0.0.0:9010->9010/tcp flinkdocker_flink-master_1 for i in $(docker ps --filter name=flink --format={{.ID}}); do docker cp ~/Downloads/com-dblp.ungraph.txt $i:/com-dblp.ungraph.txt done for i in $(docker ps --filter name=flink-master --format={{.ID}}); do docker cp ~/Dev/flink-playground/target/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar $i:/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar done docker exec -it $(docker ps --filter name=flink-master --format={{.ID}}) flink run -c com.github.melentye.flink.gelly.DblpConnectedComponents /flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar /com-dblp.ungraph.txt where flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar contains your code example. It seems to work better for me: Cluster configuration: Standalone cluster with JobManager at /172.20.0.2:6123 Using address 172.20.0.2:6123 to connect to JobManager. JobManager web interface address http://172.20.0.2:8081 Starting execution of program Submitting job with JobID: 3146a443dd208c57f5b795767da0b720. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@172.20.0.2:6123/user/jobmanager#73377916] 11/10/2016 20:10:26 Job execution switched to status RUNNING. 11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED 11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING 11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING 11/10/2016 20:10:26 DataSink (count())(1/1) switched to SCHEDULED 11/10/2016 20:10:26 DataSink (count())(1/1) switched to DEPLOYING 11/10/2016 20:10:26 DataSink (count())(1/1) switched to RUNNING 11/10/2016 20:10:27 DataSink (count())(1/1) switched to FINISHED 11/10/2016 20:10:27 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED 11/10/2016 20:10:27 Job execution switched to status FINISHED. Tuple size: 1049866 Submitting job with JobID: 76040d6474efa39a8f0a03f420e8746c. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@172.20.0.2:6123/user/jobmanager#73377916] 11/10/2016 20:10:27 Job execution switched to status RUNNING. 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED 11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING 11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING 11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING 11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING 11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to DEPLOYING 11/10/2016 20:10:28 CoGroup (Messaging)(1/1) switched to RUNNING 11/10/2016 20:10:28 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED 11/10/2016 20:10:28 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING 11/10/2016 20:10:28 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING 11/10/2016 20:10:29 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED 11/10/2016 20:10:29 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED 11/10/2016 20:10:30 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED 11/10/2016 20:10:30 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to SCHEDULED 11/10/2016 20:10:30 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to DEPLOYING 11/10/2016 20:10:30 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to RUNNING 11/10/2016 20:10:30 CoGroup (Vertex State Updates)(1/1) switched to SCHEDULED 11/10/2016 20:10:30 CoGroup (Vertex State Updates)(1/1) switched to DEPLOYING 11/10/2016 20:10:30 CoGroup (Vertex State Updates)(1/1) switched to RUNNING 11/10/2016 20:10:33 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to SCHEDULED 11/10/2016 20:10:33 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to DEPLOYING 11/10/2016 20:10:33 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to RUNNING 11/10/2016 20:10:46 DataSink (count())(1/1) switched to SCHEDULED 11/10/2016 20:10:46 DataSink (count())(1/1) switched to DEPLOYING 11/10/2016 20:10:46 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to FINISHED 11/10/2016 20:10:46 CoGroup (Vertex State Updates)(1/1) switched to FINISHED 11/10/2016 20:10:46 CoGroup (Messaging)(1/1) switched to FINISHED 11/10/2016 20:10:46 DataSink (count())(1/1) switched to RUNNING 11/10/2016 20:10:46 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to FINISHED 11/10/2016 20:10:46 DataSink (count())(1/1) switched to FINISHED 11/10/2016 20:10:46 Job execution switched to status FINISHED. Component count: 317080 Program execution finished Job with JobID 76040d6474efa39a8f0a03f420e8746c has finished. Job Runtime: 18907 ms Accumulator Results: - 50c72721df1e3fb4d82d5d60be4e24e8 (java.lang.Long): 317080 There are minor differences from your setup: I'm using Flink 1.1.3 and default settings for taskmanager.heap.mb and taskmanager.network.numberOfBuffers. Is that something you can try as well to see if newer flink version with default settings works better for you? Andrey On Tue, Nov 8, 2016 at 10:35 PM, Miguel Coimbra <[hidden email]> wrote:
|
In reply to this post by Miguel Coimbra
What do the TaskManager logs say wrt to allocation of managed memory?
Something like: Limiting managed memory to ... of the currently free heap space ..., memory will be allocated lazily. What else did you configure in flink-conf? Looping in Greg and Vasia who maintain Gelly and are most-familiar with the internals. – Ufuk On 8 November 2016 at 22:35:22, Miguel Coimbra ([hidden email]) wrote: > Dear community, > > I have a problem which I hope you'll be able to help with. > I apologize in advance for the verbosity of the post. > I am running the Flink standalone cluster (not even storing to the > filesystem) with 2 Docker containers. > > I set the image of the Dockerfile for Flink 1.1.2, which was the same > version of the main class in the .jar > The Docker image was configured to use Java 8, which is what the project's > pom.xml requires as well. > I have also edited the TaskManager conf/flink-con.yaml to have the > following values: > > .... > taskmanager.heap.mb: 7512 > .... > taskmanager.network.numberOfBuffers: 16048 > .... > > > Properties of this host/docker setup: > - host machine has *256 GB *of RAM > - job manager container is running with default flink config > - task manager has *7.5 GB *of memory available > - task manager number of buffers is *16048 *which is very generous compared > to the default value > > I am testing on the SNAP DBLP dataset: > https://snap.stanford.edu/data/com-DBLP.html > It has: > > 317080 nodes > 1049866 edges > > These are the relevant parts of the pom.xml of the project: > *(note: the project executes without error for local executions without the > cluster)* > > .... > > UTF-8 > > UTF-8 > 1.8 > 1.8 > 1.1.2 > > ..... > > > org.apache.flink > flink-java > ${flink.version} > > > org.apache.flink > flink-core > ${flink.version} > > > org.apache.flink > flink-streaming-java_2.10 > ${flink.version} > > > org.apache.flink > flink-clients_2.10 > ${flink.version} > > > org.apache.flink > flink-gelly_2.10 > ${flink.version} > > > junit > junit > 3.8.1 > test > > > > I am running (what I believe to be) a simple Gelly application, performing > the ConnectedComponents algorithm with 30 iterations: > > public static void main(String[] args) { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > > final String dataPath = args[0]; > > final DataSet> edgeTuples = > env.readCsvFile(dataPath) > .fieldDelimiter("\t") // node IDs are separated by spaces > .ignoreComments("#") // comments start with "%" > .types(Long.class, Long.class); > > try { > System.out.println("Tuple size: " + edgeTuples.count()); > } catch (Exception e1) { > e1.printStackTrace(); > } > > /* > * @param the key type for edge and vertex identifiers > * @param the value type for vertices > * @param the value type for edges > * public class Graph > */ > > > final Graph graph = Graph.fromTuple2DataSet( > edgeTuples, > new MapFunction() { > private static final long serialVersionUID = > 8713516577419451509L; > public Long map(Long value) { > return value; > } > }, > env > ); > > > try { > /** > * @param key type > * @param vertex value type > * @param edge value type > * @param the return type > > class ConnectedComponents, EV> > implements GraphAlgorithm>> > */ > > DataSet> verticesWithComponents = > graph.run(new ConnectedComponents(30)); > System.out.println("Component count: " + > verticesWithComponents.count()); > } catch (Exception e) { > e.printStackTrace(); > } > } > > > However, the following is output on the host machine on execution: > > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) > flink run -m 3de7625b8e28:6123 -c flink.graph.example.App > /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar > /home/myuser/com-dblp.ungraph.txt > > Cluster configuration: Standalone cluster with JobManager at / > 172.19.0.2:6123 > Using address 172.19.0.2:6123 to connect to JobManager. > JobManager web interface address http://172.19.0.2:8081 > Starting execution of program > Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp:// > flink@172.19.0.2:6123/user/jobmanager#-658812967] > > 11/08/2016 21:22:44 DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > SCHEDULED > 11/08/2016 21:22:44 DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > DEPLOYING > 11/08/2016 21:22:44 DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED > 11/08/2016 21:22:44 DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > FINISHED > 11/08/2016 21:22:44 Job execution switched to status FINISHED. > Tuple size: 1049866 > Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp:// > flink@172.19.0.2:6123/user/jobmanager#-658812967] > 11/08/2016 21:22:45 Job execution switched to status RUNNING. > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED > > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING > > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to > SCHEDULED > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to > DEPLOYING > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to RUNNING > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at > fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at > fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at > fromDataSet(Graph.java:217))(1/1) switched to RUNNING > 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED > 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to FINISHED > 11/08/2016 21:22:48 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at > fromDataSet(Graph.java:217))(1/1) switched to FINISHED > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) > switched to SCHEDULED > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) > switched to DEPLOYING > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) > switched to RUNNING > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) > switched to FAILED > java.lang.IllegalArgumentException: Too few memory segments provided. Hash > Table needs at least 33 memory segments. > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206) > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > 11/08/2016 21:22:48 Job execution switched to status FAILING. > java.lang.IllegalArgumentException: Too few memory segments provided. Hash > Table needs at least 33 memory segments. > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206) > at > org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > The results I found online so far were not enough, and I am not sure as to > the best way to solve this. > > If anyone can help diagnose and correct this issue, I would be very > thankful. > > Best regards, > > Miguel E. Coimbra > Email: [hidden email] > Skype: miguel.e.coimbra > |
Hello, I believe I have figured this out. First, I tried Aandrey Melentyev's suggestion of executing with Apache Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as with some changes to provide additional memory. However, the same error happened. Note: I changed my project's pom.xml and generated the .jar again using Maven. I also copied the new .jar to both Docker instances. The test machine has 256 GB RAM and it is a scenario of two Docker containers. I send attached the relevant parts of the logs of the JobManager and of the TaskManager. Regarding memory in the TaskManager log, I was looking at a couple of executions and noticed something strange: 2016-11-14 15:48:45,256 INFO org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768). 2016-11-14 15:48:45,413 INFO org.apache.flink.runtime.taskmanager.TaskManager - Limiting managed memory to 0.7 of the currently free heap space (310 MB), memory will be allocated lazily. After that, I looked at the start of the TaskManager log and found this: 2016-11-14 15:48:38,843 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager (Version: 1.1.3, Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC) 2016-11-14 15:48:38,843 INFO org.apache.flink.runtime.taskmanager.TaskManager - Current user: flink 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap size: 512 MiBytes 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskmanager.TaskManager - Hadoop version: 2.7.2 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:+UseG1GC 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xms512M 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xmx512M 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:MaxDirectMemorySize=8388607T It seems it is running with only 512 MB, which is the default. This in spite of me having edited the flink-conf.yaml file before invoking the program for the cluster. I looked at the log of the JobManager and the same thing happened: it was using the default 256 MB instead of my 1024MB. - To recap, I built the Docker Flink image with (I send the Dockerfile attached): cd docker-flink-image-builder/ ls Dockerfile Dockerfile~ README.md README.md~ bluemix-docker-compose.sh* build.sh* docker-compose-bluemix.yml ./build.sh The only file I changed from those is the Dockerfile. This set of files was obtained from the Flink repository. I used docker-compose up to start the standalone cluster: screen cd docker-flink-image-builder/ ls Dockerfile Dockerfile~ README.md README.md~ bluemix-docker-compose.sh* build.sh* docker-compose-bluemix.yml docker-compose.yml docker-entrypoint.sh* docker-compose up Then I accessed each Docker instance: docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh docker exec -it $(docker ps --filter name=dockerflinkimagebuilder_taskmanager_1 --format={{.ID}}) /bin/sh While inside each of those, I started a bash shell and changed the config file like so: bash cd /home/myuser/docker-image-build-context/flink-1.1.3/conf vi flink-conf.yaml I have edited (on both the JobManager and the TaskManager) the following settings: # The heap size for the JobManager JVM jobmanager.heap.mb: 1024 # The heap size for the TaskManager JVM taskmanager.heap.mb: 4096 # The number of buffers for the network stack. taskmanager.network.numberOfBuffers: 4096 It seems that changes I make to the flink-config.yaml file are only reflected after I kill the cluster and call docker-compose up again. docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt The cluster now started with the correct memory values, but the result was the same (it is in the logs). However, I then doubled the memory again, so that I had: # The heap size for the JobManager JVM jobmanager.heap.mb: 2048 # The heap size for the TaskManager JVM taskmanager.heap.mb: 8192 After this, I killed the cluster (CTRL+C) on the screen which had started it (graceful exit). This time, after starting again with docker-compose up, I launched the program again and it worked! However, there is something I don't understand, perhaps because I am new to the Docker ecosystem. When do the changes to the flink-conf.yaml file get activated? From my understanding, I have to do this: 1 - Launch cluster with docker-compose up 2 - exec -it into each of the Docker instances and manually edit the configuration file 3 - CTRL+C to gracefully kill cluster 4 - Relaunch cluster - it will now display correct heap values for the JobManager and TaskManager. This is cumbersome. I know I can make my own scripts to automate this, but is this really the correct way to launch a Flink standalone cluster on Docker with custom memory options? Should I instead change the Dockerfile to include a custom flink-conf.yaml file when building the image? (so this would be taken right from the start) What is the correct way to tackle this? Thank you very much! Output is below in case you are curious: myuser@myserver:~/docker-flink-image-builder$ docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt Cluster configuration: Standalone cluster with JobManager at /172.19.0.2:6123 Using address 172.19.0.2:6123 to connect to JobManager. JobManager web interface address http://172.19.0.2:8081 Starting execution of program Submitting job with JobID: 55544e0ebc1f5014df53b200974afdbf. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-1305686264] 11/14/2016 17:13:33 Job execution switched to status RUNNING. 11/14/2016 17:13:33 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED 11/14/2016 17:13:33 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING 11/14/2016 17:13:33 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING 11/14/2016 17:13:34 DataSink (count())(1/1) switched to SCHEDULED 11/14/2016 17:13:34 DataSink (count())(1/1) switched to DEPLOYING 11/14/2016 17:13:34 DataSink (count())(1/1) switched to RUNNING 11/14/2016 17:13:36 DataSink (count())(1/1) switched to FINISHED 11/14/2016 17:13:36 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED Tuple size: 1049866 Submitting job with JobID: ab0931dc89e4a86de17549eeb518fde6. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-1305686264] 11/14/2016 17:13:37 Job execution switched to status RUNNING. 11/14/2016 17:13:37 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/14/2016 17:13:37 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/14/2016 17:13:37 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING 11/14/2016 17:13:39 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED 11/14/2016 17:13:39 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED 11/14/2016 17:13:39 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING 11/14/2016 17:13:39 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED 11/14/2016 17:13:39 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING 11/14/2016 17:13:39 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING 11/14/2016 17:13:39 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING 11/14/2016 17:13:40 CoGroup (Messaging)(1/1) switched to SCHEDULED 11/14/2016 17:13:40 CoGroup (Messaging)(1/1) switched to DEPLOYING 11/14/2016 17:13:40 CoGroup (Messaging)(1/1) switched to RUNNING 11/14/2016 17:13:44 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED 11/14/2016 17:13:44 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING 11/14/2016 17:13:44 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING 11/14/2016 17:13:49 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED 11/14/2016 17:13:50 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED 11/14/2016 17:13:54 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED 11/14/2016 17:13:54 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to SCHEDULED 11/14/2016 17:13:54 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to DEPLOYING 11/14/2016 17:13:54 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to RUNNING 11/14/2016 17:13:55 CoGroup (Vertex State Updates)(1/1) switched to SCHEDULED 11/14/2016 17:13:55 CoGroup (Vertex State Updates)(1/1) switched to DEPLOYING 11/14/2016 17:13:55 CoGroup (Vertex State Updates)(1/1) switched to RUNNING 11/14/2016 17:14:06 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to SCHEDULED 11/14/2016 17:14:06 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to DEPLOYING 11/14/2016 17:14:06 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to RUNNING 11/14/2016 17:15:00 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to FINISHED 11/14/2016 17:15:00 DataSink (count())(1/1) switched to SCHEDULED 11/14/2016 17:15:00 DataSink (count())(1/1) switched to DEPLOYING 11/14/2016 17:15:00 CoGroup (Vertex State Updates)(1/1) switched to FINISHED 11/14/2016 17:15:00 DataSink (count())(1/1) switched to RUNNING 11/14/2016 17:15:00 CoGroup (Messaging)(1/1) switched to FINISHED 11/14/2016 17:15:00 DataSink (count())(1/1) switched to FINISHED 11/14/2016 17:15:00 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to FINISHED 11/14/2016 17:15:00 Job execution switched to status FINISHED. Component count: 317080 Program execution finished Job with JobID ab0931dc89e4a86de17549eeb518fde6 has finished. Job Runtime: 83229 ms Accumulator Results: - e6c358969906b4ce1d682d6840281848 (java.lang.Long): 317080 Thanks you for the attention. It seems solved. Kind regards, Miguel E. Coimbra Email: [hidden email] Skype: miguel.e.coimbra On 14 November 2016 at 09:26, Ufuk Celebi <[hidden email]> wrote: What do the TaskManager logs say wrt to allocation of managed memory? ![]() ![]() ![]() |
Hi Miguel, I'm sorry for the late reply; this e-mail got stuck in my spam folder. I'm glad that you've found a solution :) I've never used flink with docker, so I'm probably not the best person to advise you on this. However, if I understand correctly, you're changing the configuration before submitting the job but while the flink cluster is already running. I don't know if docker is supposed to do something differently, but after a flink cluster has been started, nodes won't reload any changes you make to the flink-conf.yaml. You'll either have to make your changes before starting the cluster or re-start. Cheers, -Vasia. On 14 November 2016 at 18:33, Miguel Coimbra <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |