Too few memory segments provided. Hash Table needs at least 33 memory segments.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Too few memory segments provided. Hash Table needs at least 33 memory segments.

Miguel Coimbra
Dear community,

I have a problem which I hope you'll be able to help with.
I apologize in advance for the verbosity of the post.
I am running the Flink standalone cluster (not even storing to the filesystem) with 2 Docker containers.

I set the image of the Dockerfile for Flink 1.1.2, which was the same version of the main class in the .jar
The Docker image was configured to use Java 8, which is what the project's pom.xml requires as well.
I have also edited the TaskManager conf/flink-con.yaml to have the following values:

....
taskmanager.heap.mb: 7512
....
taskmanager.network.numberOfBuffers: 16048
....


Properties of this host/docker setup:
- host machine has 256 GB of RAM
- job manager container is running with default flink config
- task manager has 7.5 GB of memory available
- task manager number of buffers is 16048 which is very generous compared to the default value

I am testing on the SNAP DBLP dataset: https://snap.stanford.edu/data/com-DBLP.html
It has:

 317080 nodes
1049866 edges

These are the relevant parts of the pom.xml of the project:
(note: the project executes without error for local executions without the cluster)

....
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.1.2</flink.version>
  </properties>
.....
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-gelly_2.10</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>


I am running (what I believe to be) a simple Gelly application, performing the ConnectedComponents algorithm with 30 iterations:

public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       
        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "%"
            .types(Long.class, Long.class); 

        try {
            System.out.println("Tuple size: " + edgeTuples.count());
        } catch (Exception e1) {
            e1.printStackTrace();
        }
       
        /*
         * @param <K> the key type for edge and vertex identifiers
         * @param <VV> the value type for vertices
         * @param <EV> the value type for edges
         * public class Graph<K, VV, EV>
         */
       
       
        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID = 8713516577419451509L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        );

       
        try {
            /**
             * @param <K> key type
             * @param <VV> vertex value type
             * @param <EV> edge value type
             * @param <T> the return type
             
            class ConnectedComponents<K, VV extends Comparable<VV>, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, VV>>>
            */
           
            DataSet<Vertex<Long, Long>> verticesWithComponents = graph.run(new ConnectedComponents<Long, Long, NullValue>(30));
            System.out.println("Component count: " + verticesWithComponents.count());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }



However, the following is output on the host machine on execution:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 3de7625b8e28:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt

Cluster configuration: Standalone cluster with JobManager at /172.19.0.2:6123
Using address 172.19.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.19.0.2:8081
Starting execution of program
Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-658812967]

11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to SCHEDULED
11/08/2016 21:22:44     DataSink (count())(1/1) switched to DEPLOYING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to RUNNING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to FINISHED
11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED
11/08/2016 21:22:44     Job execution switched to status FINISHED.
Tuple size: 1049866
Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-658812967]
11/08/2016 21:22:45     Job execution switched to status RUNNING.
11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED

11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING

11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING
11/08/2016 21:22:47     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
11/08/2016 21:22:47     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED
11/08/2016 21:22:48     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to SCHEDULED
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to DEPLOYING
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to RUNNING
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to FAILED
java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments.
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206)
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)

11/08/2016 21:22:48     Job execution switched to status FAILING.
java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments.
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206)
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)

The results I found online so far were not enough, and I am not sure as to the best way to solve this.

If anyone can help diagnose and correct this issue, I would be very thankful.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

Andrey Melentyev
Hi Miguel,

I tried to reproduce the problem in a similar setup using Flink in Docker with 2 workers:

CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                                      NAMES
3fbaf5876e31        melentye/flink:latest   "docker-entrypoint.sh"   3 minutes ago       Up 3 minutes        6121-6122/tcp                                              flinkdocker_flink-worker_2
bd87bfa6c03d        melentye/flink:latest   "docker-entrypoint.sh"   4 minutes ago       Up 3 minutes        6121-6122/tcp                                              flinkdocker_flink-worker_1
16c7607b3ec2        melentye/flink:latest   "docker-entrypoint.sh"   4 minutes ago       Up 4 minutes        0.0.0.0:8081->8081/tcp, 6123/tcp, 0.0.0.0:9010->9010/tcp   flinkdocker_flink-master_1

for i in $(docker ps --filter name=flink --format={{.ID}}); do
  docker cp ~/Downloads/com-dblp.ungraph.txt $i:/com-dblp.ungraph.txt
done

for i in $(docker ps --filter name=flink-master --format={{.ID}}); do
  docker cp ~/Dev/flink-playground/target/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar $i:/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar
done  

docker exec -it $(docker ps --filter name=flink-master --format={{.ID}}) flink run -c com.github.melentye.flink.gelly.DblpConnectedComponents /flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar /com-dblp.ungraph.txt

where flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar contains your code example. It seems to work better for me:

Cluster configuration: Standalone cluster with JobManager at /172.20.0.2:6123
Using address 172.20.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.20.0.2:8081
Starting execution of program
Submitting job with JobID: 3146a443dd208c57f5b795767da0b720. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.20.0.2:6123/user/jobmanager#73377916]
11/10/2016 20:10:26 Job execution switched to status RUNNING.
11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/10/2016 20:10:26 DataSink (count())(1/1) switched to SCHEDULED
11/10/2016 20:10:26 DataSink (count())(1/1) switched to DEPLOYING
11/10/2016 20:10:26 DataSink (count())(1/1) switched to RUNNING
11/10/2016 20:10:27 DataSink (count())(1/1) switched to FINISHED
11/10/2016 20:10:27 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED
11/10/2016 20:10:27 Job execution switched to status FINISHED.
Tuple size: 1049866
Submitting job with JobID: 76040d6474efa39a8f0a03f420e8746c. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.20.0.2:6123/user/jobmanager#73377916]
11/10/2016 20:10:27 Job execution switched to status RUNNING.
11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED
11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING
11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to SCHEDULED
11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to DEPLOYING
11/10/2016 20:10:28 CoGroup (Messaging)(1/1) switched to RUNNING
11/10/2016 20:10:28 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
11/10/2016 20:10:28 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
11/10/2016 20:10:28 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING
11/10/2016 20:10:29 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
11/10/2016 20:10:29 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED
11/10/2016 20:10:30 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED
11/10/2016 20:10:30 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to SCHEDULED
11/10/2016 20:10:30 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to DEPLOYING
11/10/2016 20:10:30 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to RUNNING
11/10/2016 20:10:30 CoGroup (Vertex State Updates)(1/1) switched to SCHEDULED
11/10/2016 20:10:30 CoGroup (Vertex State Updates)(1/1) switched to DEPLOYING
11/10/2016 20:10:30 CoGroup (Vertex State Updates)(1/1) switched to RUNNING
11/10/2016 20:10:33 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to SCHEDULED
11/10/2016 20:10:33 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to DEPLOYING
11/10/2016 20:10:33 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to RUNNING
11/10/2016 20:10:46 DataSink (count())(1/1) switched to SCHEDULED
11/10/2016 20:10:46 DataSink (count())(1/1) switched to DEPLOYING
11/10/2016 20:10:46 Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to FINISHED
11/10/2016 20:10:46 CoGroup (Vertex State Updates)(1/1) switched to FINISHED
11/10/2016 20:10:46 CoGroup (Messaging)(1/1) switched to FINISHED
11/10/2016 20:10:46 DataSink (count())(1/1) switched to RUNNING
11/10/2016 20:10:46 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@3e34ace1 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@62fe6067))(1/1) switched to FINISHED
11/10/2016 20:10:46 DataSink (count())(1/1) switched to FINISHED
11/10/2016 20:10:46 Job execution switched to status FINISHED.
Component count: 317080
Program execution finished
Job with JobID 76040d6474efa39a8f0a03f420e8746c has finished.
Job Runtime: 18907 ms
Accumulator Results:
- 50c72721df1e3fb4d82d5d60be4e24e8 (java.lang.Long): 317080

There are minor differences from your setup: I'm using Flink 1.1.3 and default settings for taskmanager.heap.mb and taskmanager.network.numberOfBuffers.

Is that something you can try as well to see if newer flink version with default settings works better for you?

Andrey

On Tue, Nov 8, 2016 at 10:35 PM, Miguel Coimbra <[hidden email]> wrote:
Dear community,

I have a problem which I hope you'll be able to help with.
I apologize in advance for the verbosity of the post.
I am running the Flink standalone cluster (not even storing to the filesystem) with 2 Docker containers.

I set the image of the Dockerfile for Flink 1.1.2, which was the same version of the main class in the .jar
The Docker image was configured to use Java 8, which is what the project's pom.xml requires as well.
I have also edited the TaskManager conf/flink-con.yaml to have the following values:

....
taskmanager.heap.mb: 7512
....
taskmanager.network.numberOfBuffers: 16048
....


Properties of this host/docker setup:
- host machine has 256 GB of RAM
- job manager container is running with default flink config
- task manager has 7.5 GB of memory available
- task manager number of buffers is 16048 which is very generous compared to the default value

I am testing on the SNAP DBLP dataset: https://snap.stanford.edu/data/com-DBLP.html
It has:

 317080 nodes
1049866 edges

These are the relevant parts of the pom.xml of the project:
(note: the project executes without error for local executions without the cluster)

....
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.1.2</flink.version>
  </properties>
.....
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-gelly_2.10</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>


I am running (what I believe to be) a simple Gelly application, performing the ConnectedComponents algorithm with 30 iterations:

public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       
        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "%"
            .types(Long.class, Long.class); 

        try {
            System.out.println("Tuple size: " + edgeTuples.count());
        } catch (Exception e1) {
            e1.printStackTrace();
        }
       
        /*
         * @param <K> the key type for edge and vertex identifiers
         * @param <VV> the value type for vertices
         * @param <EV> the value type for edges
         * public class Graph<K, VV, EV>
         */
       
       
        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID = 8713516577419451509L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        );

       
        try {
            /**
             * @param <K> key type
             * @param <VV> vertex value type
             * @param <EV> edge value type
             * @param <T> the return type
             
            class ConnectedComponents<K, VV extends Comparable<VV>, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, VV>>>
            */
           
            DataSet<Vertex<Long, Long>> verticesWithComponents = graph.run(new ConnectedComponents<Long, Long, NullValue>(30));
            System.out.println("Component count: " + verticesWithComponents.count());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }



However, the following is output on the host machine on execution:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 3de7625b8e28:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt

Cluster configuration: Standalone cluster with JobManager at /172.19.0.2:6123
Using address 172.19.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.19.0.2:8081
Starting execution of program
Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-658812967]

11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to SCHEDULED
11/08/2016 21:22:44     DataSink (count())(1/1) switched to DEPLOYING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to RUNNING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to FINISHED
11/08/2016 21:22:44     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED
11/08/2016 21:22:44     Job execution switched to status FINISHED.
Tuple size: 1049866
Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-658812967]
11/08/2016 21:22:45     Job execution switched to status RUNNING.
11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED

11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING

11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING
11/08/2016 21:22:47     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
11/08/2016 21:22:47     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED
11/08/2016 21:22:48     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to SCHEDULED
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to DEPLOYING
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to RUNNING
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to FAILED
java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments.
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206)
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)

11/08/2016 21:22:48     Job execution switched to status FAILING.
java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments.
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206)
        at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
        at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)

The results I found online so far were not enough, and I am not sure as to the best way to solve this.

If anyone can help diagnose and correct this issue, I would be very thankful.

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

Ufuk Celebi
In reply to this post by Miguel Coimbra
What do the TaskManager logs say wrt to allocation of managed memory?

Something like:

Limiting managed memory to ... of the currently free heap space ..., memory will be allocated lazily.

What else did you configure in flink-conf?

Looping in Greg and Vasia who maintain Gelly and are most-familiar with the internals.

– Ufuk


On 8 November 2016 at 22:35:22, Miguel Coimbra ([hidden email]) wrote:

> Dear community,
>  
> I have a problem which I hope you'll be able to help with.
> I apologize in advance for the verbosity of the post.
> I am running the Flink standalone cluster (not even storing to the
> filesystem) with 2 Docker containers.
>  
> I set the image of the Dockerfile for Flink 1.1.2, which was the same
> version of the main class in the .jar
> The Docker image was configured to use Java 8, which is what the project's
> pom.xml requires as well.
> I have also edited the TaskManager conf/flink-con.yaml to have the
> following values:
>  
> ....
> taskmanager.heap.mb: 7512
> ....
> taskmanager.network.numberOfBuffers: 16048
> ....
>  
>  
> Properties of this host/docker setup:
> - host machine has *256 GB *of RAM
> - job manager container is running with default flink config
> - task manager has *7.5 GB *of memory available
> - task manager number of buffers is *16048 *which is very generous compared
> to the default value
>  
> I am testing on the SNAP DBLP dataset:
> https://snap.stanford.edu/data/com-DBLP.html
> It has:
>  
> 317080 nodes
> 1049866 edges
>  
> These are the relevant parts of the pom.xml of the project:
> *(note: the project executes without error for local executions without the
> cluster)*
>  
> ....
>  
> UTF-8
>  
> UTF-8  
> 1.8
> 1.8
> 1.1.2
>  
> .....
>  
>  
> org.apache.flink
> flink-java
> ${flink.version}
>  
>  
> org.apache.flink
> flink-core
> ${flink.version}
>  
>  
> org.apache.flink
> flink-streaming-java_2.10
> ${flink.version}
>  
>  
> org.apache.flink
> flink-clients_2.10
> ${flink.version}
>  
>  
> org.apache.flink
> flink-gelly_2.10
> ${flink.version}
>  
>  
> junit
> junit
> 3.8.1
> test
>  
>  
>  
> I am running (what I believe to be) a simple Gelly application, performing
> the ConnectedComponents algorithm with 30 iterations:
>  
> public static void main(String[] args) {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>  
>  
> final String dataPath = args[0];
>  
> final DataSet> edgeTuples =
> env.readCsvFile(dataPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#") // comments start with "%"
> .types(Long.class, Long.class);
>  
> try {
> System.out.println("Tuple size: " + edgeTuples.count());
> } catch (Exception e1) {
> e1.printStackTrace();
> }
>  
> /*
> * @param the key type for edge and vertex identifiers
> * @param the value type for vertices
> * @param the value type for edges
> * public class Graph
> */
>  
>  
> final Graph graph = Graph.fromTuple2DataSet(
> edgeTuples,
> new MapFunction() {
> private static final long serialVersionUID =
> 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env
> );
>  
>  
> try {
> /**
> * @param key type
> * @param vertex value type
> * @param edge value type
> * @param the return type
>  
> class ConnectedComponents, EV>
> implements GraphAlgorithm>>
> */
>  
> DataSet> verticesWithComponents =
> graph.run(new ConnectedComponents(30));
> System.out.println("Component count: " +
> verticesWithComponents.count());
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>  
>  
> However, the following is output on the host machine on execution:
>  
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
> /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> /home/myuser/com-dblp.ungraph.txt
>  
> Cluster configuration: Standalone cluster with JobManager at /
> 172.19.0.2:6123
> Using address 172.19.0.2:6123 to connect to JobManager.
> JobManager web interface address http://172.19.0.2:8081
> Starting execution of program
> Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
>  
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING  
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> FINISHED
> 11/08/2016 21:22:44 Job execution switched to status FINISHED.
> Tuple size: 1049866
> Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
> 11/08/2016 21:22:45 Job execution switched to status RUNNING.
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
>  
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
>  
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to RUNNING
> 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
> 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)  
> switched to SCHEDULED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)  
> switched to DEPLOYING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)  
> switched to RUNNING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)  
> switched to FAILED
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)  
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)  
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)  
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)  
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>  
> 11/08/2016 21:22:48 Job execution switched to status FAILING.
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)  
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)  
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)  
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)  
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>  
> The results I found online so far were not enough, and I am not sure as to
> the best way to solve this.
>  
> If anyone can help diagnose and correct this issue, I would be very
> thankful.
>  
> Best regards,
>  
> Miguel E. Coimbra
> Email: [hidden email]  
> Skype: miguel.e.coimbra
>  

Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

Miguel Coimbra
Hello,

I believe I have figured this out.

First, I tried Aandrey Melentyev's suggestion of executing with Apache Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as with some changes to provide additional memory. However, the same error happened.

Note: I changed my project's pom.xml and generated the .jar again using Maven.
I also copied the new .jar to both Docker instances.

The test machine has 256 GB RAM and it is a scenario of two Docker containers.
I send attached the relevant parts of the logs of the JobManager and of the TaskManager.
Regarding memory in the TaskManager log, I was looking at a couple of executions and noticed something strange:

2016-11-14 15:48:45,256 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
2016-11-14 15:48:45,413 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Limiting managed memory to 0.7 of the currently free heap space (310 MB), memory will be allocated lazily.


After that, I looked at the start of the TaskManager log and found this:

2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Starting TaskManager (Version: 1.1.3, Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Current user: flink
2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Maximum heap size: 512 MiBytes
2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop version: 2.7.2
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JVM Options:
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -XX:+UseG1GC
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Xms512M
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Xmx512M
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -XX:MaxDirectMemorySize=8388607T


It seems it is running with only 512 MB, which is the default.
This in spite of me having edited the flink-conf.yaml file before invoking the program for the cluster.
I looked at the log of the JobManager and the same thing happened: it was using the default 256 MB instead of my 1024MB.

- To recap, I built the Docker Flink image with (I send the Dockerfile attached):

cd docker-flink-image-builder/
ls
Dockerfile  Dockerfile~  README.md  README.md~  bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
./build.sh

The only file I changed from those is the Dockerfile.
This set of files was obtained from the Flink repository.
I used docker-compose up to start the standalone cluster:

screen
cd docker-flink-image-builder/
ls
Dockerfile  Dockerfile~  README.md  README.md~  bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml  docker-compose.yml  docker-entrypoint.sh*

docker-compose up

Then I accessed each Docker instance:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh
docker exec -it $(docker ps --filter name=dockerflinkimagebuilder_taskmanager_1 --format={{.ID}}) /bin/sh

While inside each of those, I started a bash shell and changed the config file like so:

bash
cd /home/myuser/docker-image-build-context/flink-1.1.3/conf
vi flink-conf.yaml

I have edited (on both the JobManager and the TaskManager) the following settings:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 4096

# The number of buffers for the network stack.
taskmanager.network.numberOfBuffers: 4096

It seems that changes I make to the flink-config.yaml file are only reflected after I kill the cluster and call docker-compose up again.

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt

The cluster now started with the correct memory values, but the result was the same (it is in the logs).
However, I then doubled the memory again, so that I had:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 2048

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 8192

​After this, I killed the cluster (CTRL+C) on the screen which had started it (graceful exit).
This time, after starting again with docker-compose up, I launched the program again and it worked!

However, there is something I don't understand, perhaps because I am new to the Docker ecosystem.
When do the changes to the flink-conf.yaml file get activated?

From my understanding, I have to do this:

1 - Launch cluster with docker-compose up
2 - exec -it into each of the Docker instances and manually edit the configuration file
3 - CTRL+C to gracefully kill cluster
4 - Relaunch cluster - it will now display correct heap values for the JobManager and TaskManager.

This is cumbersome.
I know I can make my own scripts to automate this, but is this really the correct way to launch a Flink standalone cluster on Docker with custom memory options?

Should I instead change the Dockerfile to include a custom flink-conf.yaml file when building the image? (so this would be taken right from the start)
What is the correct way to tackle this?

Thank you very much!

Output is below in case you are curious:

myuser@myserver:~/docker-flink-image-builder$ docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt
Cluster configuration: Standalone cluster with JobManager at /172.19.0.2:6123
Using address 172.19.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.19.0.2:8081
Starting execution of program
Submitting job with JobID: 55544e0ebc1f5014df53b200974afdbf. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-1305686264]
11/14/2016 17:13:33     Job execution switched to status RUNNING.
11/14/2016 17:13:33     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
11/14/2016 17:13:33     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
11/14/2016 17:13:33     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/14/2016 17:13:34     DataSink (count())(1/1) switched to SCHEDULED
11/14/2016 17:13:34     DataSink (count())(1/1) switched to DEPLOYING
11/14/2016 17:13:34     DataSink (count())(1/1) switched to RUNNING
11/14/2016 17:13:36     DataSink (count())(1/1) switched to FINISHED
11/14/2016 17:13:36     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED
Tuple size: 1049866
Submitting job with JobID: ab0931dc89e4a86de17549eeb518fde6. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-1305686264]
11/14/2016 17:13:37     Job execution switched to status RUNNING.
11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/14/2016 17:13:39     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to SCHEDULED
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to DEPLOYING
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to RUNNING
11/14/2016 17:13:44     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
11/14/2016 17:13:44     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
11/14/2016 17:13:44     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING
11/14/2016 17:13:49     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
11/14/2016 17:13:50     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED
11/14/2016 17:13:54     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to SCHEDULED
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to DEPLOYING
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to RUNNING
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to SCHEDULED
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to DEPLOYING
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to RUNNING
11/14/2016 17:14:06     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to SCHEDULED
11/14/2016 17:14:06     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to DEPLOYING
11/14/2016 17:14:06     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to RUNNING
11/14/2016 17:15:00     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to SCHEDULED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to DEPLOYING
11/14/2016 17:15:00     CoGroup (Vertex State Updates)(1/1) switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to RUNNING
11/14/2016 17:15:00     CoGroup (Messaging)(1/1) switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to FINISHED
11/14/2016 17:15:00     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to FINISHED
11/14/2016 17:15:00     Job execution switched to status FINISHED.
Component count: 317080
Program execution finished
Job with JobID ab0931dc89e4a86de17549eeb518fde6 has finished.
Job Runtime: 83229 ms
Accumulator Results:
- e6c358969906b4ce1d682d6840281848 (java.lang.Long): 317080

Thanks you for the attention. It seems solved.​


Kind regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 14 November 2016 at 09:26, Ufuk Celebi <[hidden email]> wrote:
What do the TaskManager logs say wrt to allocation of managed memory?

Something like:

Limiting managed memory to ... of the currently free heap space ..., memory will be allocated lazily.

What else did you configure in flink-conf?

Looping in Greg and Vasia who maintain Gelly and are most-familiar with the internals.

– Ufuk


On 8 November 2016 at 22:35:22, Miguel Coimbra ([hidden email]) wrote:
> Dear community,
>
> I have a problem which I hope you'll be able to help with.
> I apologize in advance for the verbosity of the post.
> I am running the Flink standalone cluster (not even storing to the
> filesystem) with 2 Docker containers.
>
> I set the image of the Dockerfile for Flink 1.1.2, which was the same
> version of the main class in the .jar
> The Docker image was configured to use Java 8, which is what the project's
> pom.xml requires as well.
> I have also edited the TaskManager conf/flink-con.yaml to have the
> following values:
>
> ....
> taskmanager.heap.mb: 7512
> ....
> taskmanager.network.numberOfBuffers: 16048
> ....
>
>
> Properties of this host/docker setup:
> - host machine has *256 GB *of RAM
> - job manager container is running with default flink config
> - task manager has *7.5 GB *of memory available
> - task manager number of buffers is *16048 *which is very generous compared
> to the default value
>
> I am testing on the SNAP DBLP dataset:
> https://snap.stanford.edu/data/com-DBLP.html
> It has:
>
> 317080 nodes
> 1049866 edges
>
> These are the relevant parts of the pom.xml of the project:
> *(note: the project executes without error for local executions without the
> cluster)*
>
> ....
>
> UTF-8
>
> UTF-8
> 1.8
> 1.8
> 1.1.2
>
> .....
>
>
> org.apache.flink
> flink-java
> ${flink.version}
>
>
> org.apache.flink
> flink-core
> ${flink.version}
>
>
> org.apache.flink
> flink-streaming-java_2.10
> ${flink.version}
>
>
> org.apache.flink
> flink-clients_2.10
> ${flink.version}
>
>
> org.apache.flink
> flink-gelly_2.10
> ${flink.version}
>
>
> junit
> junit
> 3.8.1
> test
>
>
>
> I am running (what I believe to be) a simple Gelly application, performing
> the ConnectedComponents algorithm with 30 iterations:
>
> public static void main(String[] args) {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
> final String dataPath = args[0];
>
> final DataSet> edgeTuples =
> env.readCsvFile(dataPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#") // comments start with "%"
> .types(Long.class, Long.class);
>
> try {
> System.out.println("Tuple size: " + edgeTuples.count());
> } catch (Exception e1) {
> e1.printStackTrace();
> }
>
> /*
> * @param the key type for edge and vertex identifiers
> * @param the value type for vertices
> * @param the value type for edges
> * public class Graph
> */
>
>
> final Graph graph = Graph.fromTuple2DataSet(
> edgeTuples,
> new MapFunction() {
> private static final long serialVersionUID =
> 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env
> );
>
>
> try {
> /**
> * @param key type
> * @param vertex value type
> * @param edge value type
> * @param the return type
>
> class ConnectedComponents, EV>
> implements GraphAlgorithm>>
> */
>
> DataSet> verticesWithComponents =
> graph.run(new ConnectedComponents(30));
> System.out.println("Component count: " +
> verticesWithComponents.count());
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
>
> However, the following is output on the host machine on execution:
>
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
> /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> /home/myuser/com-dblp.ungraph.txt
>
> Cluster configuration: Standalone cluster with JobManager at /
> 172.19.0.2:6123
> Using address 172.19.0.2:6123 to connect to JobManager.
> JobManager web interface address http://172.19.0.2:8081
> Starting execution of program
> Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
>
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> FINISHED
> 11/08/2016 21:22:44 Job execution switched to status FINISHED.
> Tuple size: 1049866
> Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
> 11/08/2016 21:22:45 Job execution switched to status RUNNING.
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
>
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
>
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to RUNNING
> 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
> 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to SCHEDULED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to DEPLOYING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to RUNNING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to FAILED
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> 11/08/2016 21:22:48 Job execution switched to status FAILING.
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> The results I found online so far were not enough, and I am not sure as to
> the best way to solve this.
>
> If anyone can help diagnose and correct this issue, I would be very
> thankful.
>
> Best regards,
>
> Miguel E. Coimbra
> Email: [hidden email]
> Skype: miguel.e.coimbra
>



flink-client.log (28K) Download Attachment
flink-taskmanager.log (99K) Download Attachment
Dockerfile (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

Vasiliki Kalavri
Hi Miguel,

I'm sorry for the late reply; this e-mail got stuck in my spam folder. I'm glad that you've found a solution :)

I've never used flink with docker, so I'm probably not the best person to advise you on this. However, if I understand correctly, you're changing the configuration before submitting the job but while the flink cluster is already running. I don't know if docker is supposed to do something differently, but after a flink cluster has been started, nodes won't reload any changes you make to the flink-conf.yaml. You'll either have to make your changes before starting the cluster or re-start.

Cheers,
-Vasia.

On 14 November 2016 at 18:33, Miguel Coimbra <[hidden email]> wrote:
Hello,

I believe I have figured this out.

First, I tried Aandrey Melentyev's suggestion of executing with Apache Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as with some changes to provide additional memory. However, the same error happened.

Note: I changed my project's pom.xml and generated the .jar again using Maven.
I also copied the new .jar to both Docker instances.

The test machine has 256 GB RAM and it is a scenario of two Docker containers.
I send attached the relevant parts of the logs of the JobManager and of the TaskManager.
Regarding memory in the TaskManager log, I was looking at a couple of executions and noticed something strange:

2016-11-14 15:48:45,256 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
2016-11-14 15:48:45,413 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Limiting managed memory to 0.7 of the currently free heap space (310 MB), memory will be allocated lazily.


After that, I looked at the start of the TaskManager log and found this:

2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Starting TaskManager (Version: 1.1.3, Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Current user: flink
2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Maximum heap size: 512 MiBytes
2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop version: 2.7.2
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -  JVM Options:
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -XX:+UseG1GC
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Xms512M
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -Xmx512M
2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskmanager.TaskManager              -     -XX:MaxDirectMemorySize=8388607T


It seems it is running with only 512 MB, which is the default.
This in spite of me having edited the flink-conf.yaml file before invoking the program for the cluster.
I looked at the log of the JobManager and the same thing happened: it was using the default 256 MB instead of my 1024MB.

- To recap, I built the Docker Flink image with (I send the Dockerfile attached):

cd docker-flink-image-builder/
ls
Dockerfile  Dockerfile~  README.md  README.md~  bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
./build.sh

The only file I changed from those is the Dockerfile.
This set of files was obtained from the Flink repository.
I used docker-compose up to start the standalone cluster:

screen
cd docker-flink-image-builder/
ls
Dockerfile  Dockerfile~  README.md  README.md~  bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml  docker-compose.yml  docker-entrypoint.sh*

docker-compose up

Then I accessed each Docker instance:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh
docker exec -it $(docker ps --filter name=dockerflinkimagebuilder_taskmanager_1 --format={{.ID}}) /bin/sh

While inside each of those, I started a bash shell and changed the config file like so:

bash
cd /home/myuser/docker-image-build-context/flink-1.1.3/conf
vi flink-conf.yaml

I have edited (on both the JobManager and the TaskManager) the following settings:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 4096

# The number of buffers for the network stack.
taskmanager.network.numberOfBuffers: 4096

It seems that changes I make to the flink-config.yaml file are only reflected after I kill the cluster and call docker-compose up again.

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt

The cluster now started with the correct memory values, but the result was the same (it is in the logs).
However, I then doubled the memory again, so that I had:

# The heap size for the JobManager JVM
jobmanager.heap.mb: 2048

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 8192

​After this, I killed the cluster (CTRL+C) on the screen which had started it (graceful exit).
This time, after starting again with docker-compose up, I launched the program again and it worked!

However, there is something I don't understand, perhaps because I am new to the Docker ecosystem.
When do the changes to the flink-conf.yaml file get activated?

From my understanding, I have to do this:

1 - Launch cluster with docker-compose up
2 - exec -it into each of the Docker instances and manually edit the configuration file
3 - CTRL+C to gracefully kill cluster
4 - Relaunch cluster - it will now display correct heap values for the JobManager and TaskManager.

This is cumbersome.
I know I can make my own scripts to automate this, but is this really the correct way to launch a Flink standalone cluster on Docker with custom memory options?

Should I instead change the Dockerfile to include a custom flink-conf.yaml file when building the image? (so this would be taken right from the start)
What is the correct way to tackle this?

Thank you very much!

Output is below in case you are curious:

myuser@myserver:~/docker-flink-image-builder$ docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt
Cluster configuration: Standalone cluster with JobManager at /172.19.0.2:6123
Using address 172.19.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.19.0.2:8081
Starting execution of program
Submitting job with JobID: 55544e0ebc1f5014df53b200974afdbf. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-1305686264]
11/14/2016 17:13:33     Job execution switched to status RUNNING.
11/14/2016 17:13:33     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED
11/14/2016 17:13:33     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING
11/14/2016 17:13:33     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/14/2016 17:13:34     DataSink (count())(1/1) switched to SCHEDULED
11/14/2016 17:13:34     DataSink (count())(1/1) switched to DEPLOYING
11/14/2016 17:13:34     DataSink (count())(1/1) switched to RUNNING
11/14/2016 17:13:36     DataSink (count())(1/1) switched to FINISHED
11/14/2016 17:13:36     DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED
Tuple size: 1049866
Submitting job with JobID: ab0931dc89e4a86de17549eeb518fde6. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@172.19.0.2:6123/user/jobmanager#-1305686264]
11/14/2016 17:13:37     Job execution switched to status RUNNING.
11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/14/2016 17:13:37     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/14/2016 17:13:39     CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING
11/14/2016 17:13:39     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/14/2016 17:13:39     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to SCHEDULED
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to DEPLOYING
11/14/2016 17:13:40     CoGroup (Messaging)(1/1) switched to RUNNING
11/14/2016 17:13:44     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
11/14/2016 17:13:44     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
11/14/2016 17:13:44     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING
11/14/2016 17:13:49     CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
11/14/2016 17:13:50     CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED
11/14/2016 17:13:54     CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to SCHEDULED
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to DEPLOYING
11/14/2016 17:13:54     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to RUNNING
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to SCHEDULED
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to DEPLOYING
11/14/2016 17:13:55     CoGroup (Vertex State Updates)(1/1) switched to RUNNING
11/14/2016 17:14:06     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to SCHEDULED
11/14/2016 17:14:06     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to DEPLOYING
11/14/2016 17:14:06     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to RUNNING
11/14/2016 17:15:00     Sync (Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to SCHEDULED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to DEPLOYING
11/14/2016 17:15:00     CoGroup (Vertex State Updates)(1/1) switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to RUNNING
11/14/2016 17:15:00     CoGroup (Messaging)(1/1) switched to FINISHED
11/14/2016 17:15:00     DataSink (count())(1/1) switched to FINISHED
11/14/2016 17:15:00     IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) switched to FINISHED
11/14/2016 17:15:00     Job execution switched to status FINISHED.
Component count: 317080
Program execution finished
Job with JobID ab0931dc89e4a86de17549eeb518fde6 has finished.
Job Runtime: 83229 ms
Accumulator Results:
- e6c358969906b4ce1d682d6840281848 (java.lang.Long): 317080

Thanks you for the attention. It seems solved.​


Kind regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 14 November 2016 at 09:26, Ufuk Celebi <[hidden email]> wrote:
What do the TaskManager logs say wrt to allocation of managed memory?

Something like:

Limiting managed memory to ... of the currently free heap space ..., memory will be allocated lazily.

What else did you configure in flink-conf?

Looping in Greg and Vasia who maintain Gelly and are most-familiar with the internals.

– Ufuk


On 8 November 2016 at 22:35:22, Miguel Coimbra ([hidden email]) wrote:
> Dear community,
>
> I have a problem which I hope you'll be able to help with.
> I apologize in advance for the verbosity of the post.
> I am running the Flink standalone cluster (not even storing to the
> filesystem) with 2 Docker containers.
>
> I set the image of the Dockerfile for Flink 1.1.2, which was the same
> version of the main class in the .jar
> The Docker image was configured to use Java 8, which is what the project's
> pom.xml requires as well.
> I have also edited the TaskManager conf/flink-con.yaml to have the
> following values:
>
> ....
> taskmanager.heap.mb: 7512
> ....
> taskmanager.network.numberOfBuffers: 16048
> ....
>
>
> Properties of this host/docker setup:
> - host machine has *256 GB *of RAM
> - job manager container is running with default flink config
> - task manager has *7.5 GB *of memory available
> - task manager number of buffers is *16048 *which is very generous compared
> to the default value
>
> I am testing on the SNAP DBLP dataset:
> https://snap.stanford.edu/data/com-DBLP.html
> It has:
>
> 317080 nodes
> 1049866 edges
>
> These are the relevant parts of the pom.xml of the project:
> *(note: the project executes without error for local executions without the
> cluster)*
>
> ....
>
> UTF-8
>
> UTF-8
> 1.8
> 1.8
> 1.1.2
>
> .....
>
>
> org.apache.flink
> flink-java
> ${flink.version}
>
>
> org.apache.flink
> flink-core
> ${flink.version}
>
>
> org.apache.flink
> flink-streaming-java_2.10
> ${flink.version}
>
>
> org.apache.flink
> flink-clients_2.10
> ${flink.version}
>
>
> org.apache.flink
> flink-gelly_2.10
> ${flink.version}
>
>
> junit
> junit
> 3.8.1
> test
>
>
>
> I am running (what I believe to be) a simple Gelly application, performing
> the ConnectedComponents algorithm with 30 iterations:
>
> public static void main(String[] args) {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
> final String dataPath = args[0];
>
> final DataSet> edgeTuples =
> env.readCsvFile(dataPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#") // comments start with "%"
> .types(Long.class, Long.class);
>
> try {
> System.out.println("Tuple size: " + edgeTuples.count());
> } catch (Exception e1) {
> e1.printStackTrace();
> }
>
> /*
> * @param the key type for edge and vertex identifiers
> * @param the value type for vertices
> * @param the value type for edges
> * public class Graph
> */
>
>
> final Graph graph = Graph.fromTuple2DataSet(
> edgeTuples,
> new MapFunction() {
> private static final long serialVersionUID =
> 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env
> );
>
>
> try {
> /**
> * @param key type
> * @param vertex value type
> * @param edge value type
> * @param the return type
>
> class ConnectedComponents, EV>
> implements GraphAlgorithm>>
> */
>
> DataSet> verticesWithComponents =
> graph.run(new ConnectedComponents(30));
> System.out.println("Component count: " +
> verticesWithComponents.count());
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
>
> However, the following is output on the host machine on execution:
>
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
> /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> /home/myuser/com-dblp.ungraph.txt
>
> Cluster configuration: Standalone cluster with JobManager at /
> 172.19.0.2:6123
> Using address 172.19.0.2:6123 to connect to JobManager.
> JobManager web interface address http://172.19.0.2:8081
> Starting execution of program
> Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
>
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> FINISHED
> 11/08/2016 21:22:44 Job execution switched to status FINISHED.
> Tuple size: 1049866
> Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
> 11/08/2016 21:22:45 Job execution switched to status RUNNING.
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
>
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
>
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to RUNNING
> 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
> 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to SCHEDULED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to DEPLOYING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to RUNNING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
> switched to FAILED
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> 11/08/2016 21:22:48 Job execution switched to status FAILING.
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> The results I found online so far were not enough, and I am not sure as to
> the best way to solve this.
>
> If anyone can help diagnose and correct this issue, I would be very
> thankful.
>
> Best regards,
>
> Miguel E. Coimbra
> Email: [hidden email]
> Skype: miguel.e.coimbra
>