Too few memory segments provided exception

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Too few memory segments provided exception

Shivani Ghatge
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?
Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Maximilian Michels
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?

Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Till Rohrmann

The taskmanager.memory.fraction you can also set from within the IDE by giving the corresponding configuration object to the LocalEnvironment using the setConfiguration method. However, the taskmanager.heap.mb is basically the -Xmx value with which you start your JVM. Usually, you can set this in your program run settings.

Cheers,
Till


On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?


Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Shivani Ghatge
In reply to this post by Maximilian Michels
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?


Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Andra Lungu
The behavior back then seems to be exactly what Shivani is experiencing at the moment. At that point I remember Fabian suggested to test subsets of the code. The problem is that too many operations are executed: joins, coGroups etc...
However, we are implementing examples and library methods here, so we actually need to test the `entire` functionality.

That issue never got fixed; Someone suggested increasing the memory allocated for tests, but nothing happened as far as I remember :|. Furthermore, I am not sure that this would be the solution because as more and more operators get added, Flink will again run out of memory (we don't know how big this memory fraction should ideally be).

It would be great if we could fix this problem :) Or if we can't, can someone suggest what we should do with such PRs? We certainly cannot merge code with failing tests...

Cheers,
Andra

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?



Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Maximilian Michels
In reply to this post by Shivani Ghatge
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?



Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Vasiliki Kalavri
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?




Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Andra Lungu
I also questioned the vertex-centric approach before. The exact computation does not throw this exception so I guess adapting the approximate version will do the trick [I also suggested improving the algorithm to use less operators offline].

However, the issue still persists. We saw it in Affinity Propagation as well... So even if the problem will disappear for this example, I am curious how we should handle it in the future.  

On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?





Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Vasiliki Kalavri
I believe there was some work in progress to reduce memory fragmentation and solve similar problems.
Anyone knows what's happening with that?

On 20 July 2015 at 16:29, Andra Lungu <[hidden email]> wrote:
I also questioned the vertex-centric approach before. The exact computation does not throw this exception so I guess adapting the approximate version will do the trick [I also suggested improving the algorithm to use less operators offline].

However, the issue still persists. We saw it in Affinity Propagation as well... So even if the problem will disappear for this example, I am curious how we should handle it in the future.  

On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?






Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Shivani Ghatge
In reply to this post by Vasiliki Kalavri
Hello Vasia,

As I had mentioned before, I need a BloomFilter as well as a HashSet for the approximation to work. In the exact solution I am getting two HashSets and comparing them. In approximate version, if we get two BloomFilters then we have no way to compare the neighborhood sets.

I thought we agreed that the BloomFilters are to be sent as messages to the vertices?

The exact version is passing all the tests.

On removing the final GroupReduce the program is working but I need it to add the Partial Adamic Adar edges weights.

On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?





Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Shivani Ghatge
Also the example of Jaccard that you had linked me to used VertexCentric configuration which I understand is because that api only uses VertexCentricIteration for all the operations? But I think that is the best way in order to know what neighbors belong to the BloomFilter?

On Mon, Jul 20, 2015 at 3:43 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Vasia,

As I had mentioned before, I need a BloomFilter as well as a HashSet for the approximation to work. In the exact solution I am getting two HashSets and comparing them. In approximate version, if we get two BloomFilters then we have no way to compare the neighborhood sets.

I thought we agreed that the BloomFilters are to be sent as messages to the vertices?

The exact version is passing all the tests.

On removing the final GroupReduce the program is working but I need it to add the Partial Adamic Adar edges weights.

On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?






Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Vasiliki Kalavri
Hi Shivani,

the Jaccard example is implemented in Giraph, and therefore uses iterations.
However, in Gelly we are not forced to do that for non-iterative computations.

I see that there is some confusion with the implementation specifics.
Let me try to write down some skeleton code / detailed description on how to do this properly in Gelly and let's move this discussion to the corresponding issue.

Cheers,
-Vasia.

On 20 July 2015 at 16:45, Shivani Ghatge <[hidden email]> wrote:
Also the example of Jaccard that you had linked me to used VertexCentric configuration which I understand is because that api only uses VertexCentricIteration for all the operations? But I think that is the best way in order to know what neighbors belong to the BloomFilter?

On Mon, Jul 20, 2015 at 3:43 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Vasia,

As I had mentioned before, I need a BloomFilter as well as a HashSet for the approximation to work. In the exact solution I am getting two HashSets and comparing them. In approximate version, if we get two BloomFilters then we have no way to compare the neighborhood sets.

I thought we agreed that the BloomFilters are to be sent as messages to the vertices?

The exact version is passing all the tests.

On removing the final GroupReduce the program is working but I need it to add the Partial Adamic Adar edges weights.

On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?







Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Shivani Ghatge
In reply to this post by Shivani Ghatge
Hello Vasia,

I will adapt the exact method for BloomFilter. (I think it can be done. Sorry. My mistake). 


On Mon, Jul 20, 2015 at 3:45 PM, Shivani Ghatge <[hidden email]> wrote:
Also the example of Jaccard that you had linked me to used VertexCentric configuration which I understand is because that api only uses VertexCentricIteration for all the operations? But I think that is the best way in order to know what neighbors belong to the BloomFilter?

On Mon, Jul 20, 2015 at 3:43 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Vasia,

As I had mentioned before, I need a BloomFilter as well as a HashSet for the approximation to work. In the exact solution I am getting two HashSets and comparing them. In approximate version, if we get two BloomFilters then we have no way to compare the neighborhood sets.

I thought we agreed that the BloomFilters are to be sent as messages to the vertices?

The exact version is passing all the tests.

On removing the final GroupReduce the program is working but I need it to add the Partial Adamic Adar edges weights.

On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?







Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Shivani Ghatge
But it will need to build BloomFilters for each vertex for each edge so idk how efficient that would be.

On Mon, Jul 20, 2015 at 4:02 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Vasia,

I will adapt the exact method for BloomFilter. (I think it can be done. Sorry. My mistake). 


On Mon, Jul 20, 2015 at 3:45 PM, Shivani Ghatge <[hidden email]> wrote:
Also the example of Jaccard that you had linked me to used VertexCentric configuration which I understand is because that api only uses VertexCentricIteration for all the operations? But I think that is the best way in order to know what neighbors belong to the BloomFilter?

On Mon, Jul 20, 2015 at 3:43 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Vasia,

As I had mentioned before, I need a BloomFilter as well as a HashSet for the approximation to work. In the exact solution I am getting two HashSets and comparing them. In approximate version, if we get two BloomFilters then we have no way to compare the neighborhood sets.

I thought we agreed that the BloomFilters are to be sent as messages to the vertices?

The exact version is passing all the tests.

On removing the final GroupReduce the program is working but I need it to add the Partial Adamic Adar edges weights.

On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
It's not an iterative computation :) 

In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.

Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?

Cheers,
Max

On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
Hello Maximilian,

Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?

Here is my PR https://github.com/apache/flink/pull/923
And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078

Also on the IDE it is working fine in Collection execution mode.

Thanks and Regards,
Shivani

On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
Hi Shivani,

Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).

If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).

Hope that helps.

Cheers,
Max

On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
Hello,
 I am working on a problem which implements Adamic Adar Algorithm using Gelly.
I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)

Too few memory segments provided. Hash Join needs at least 33 memory segments.


The problem persists even when I comment out some of the joins.

Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

as suggested by @AndraLungu the problem persists.

The code is


DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();

        //get neighbors of each vertex in the HashSet for it's value
        computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
       
        //get vertices with updated values for the final Graph which will be used to get Adamic Edges
        Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());

        Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
                Graph.fromDataSet(Vertices, edges, env);
       
        //configure Vertex Centric Iteration
        VertexCentricConfiguration parameters = new VertexCentricConfiguration();

        parameters.setName("Find Adamic Adar Edge Weights");

        parameters.setDirection(EdgeDirection.ALL);
       
        //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
        updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
       
        //Extract Vertices of the updated graph
        DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
       
        //Extract the list of Edges from the vertex values
        DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
       
        //Partial weights for the edges are added
        edg = edg.groupBy(0,1).reduce(new AdamGroup());

        //Graph is updated with the Adamic Adar Edges
        edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());

Any idea how I could tackle this Exception?








Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

Ufuk Celebi
In reply to this post by Vasiliki Kalavri
BTW we should add an entry for this to the faq and point to the configuration or faq entry in the exception message.

On 20 Jul 2015, at 15:15, Vasiliki Kalavri <[hidden email]> wrote:

> Hi Shivani,
>
> why are you using a vertex-centric iteration to compute the approximate Adamic-Adar?
> It's not an iterative computation :)
>
> In fact, it should be as complex (in terms of operators) as the exact Adamic-Adar, only more efficient because of the different neighborhood representation. Are you having the same problem with the exact computation?
>
> Cheers,
> Vasia.
>
> On 20 July 2015 at 14:41, Maximilian Michels <[hidden email]> wrote:
> Hi Shivani,
>
> The issue is that by the time the Hash Join is executed, the MutableHashTable cannot allocate enough memory segments. That means that your other operators are occupying them. It is fine that this also occurs on Travis because the workers there have limited memory as well.
>
> Till suggested to change the memory fraction through the ExuectionEnvironment. Can you try that?
>
> Cheers,
> Max
>
> On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <[hidden email]> wrote:
> Hello Maximilian,
>
> Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?
>
> Here is my PR https://github.com/apache/flink/pull/923
> And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078
>
> Also on the IDE it is working fine in Collection execution mode.
>
> Thanks and Regards,
> Shivani
>
> On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <[hidden email]> wrote:
> Hi Shivani,
>
> Flink doesn't have enough memory to perform a hash join. You need to provide Flink with more memory. You can either increase the "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" to some value greater than 0.7 and smaller then 1.0. The first config variable allocates more overall memory for Flink; the latter changes the ratio between Flink managed memory (e.g. for hash join) and user memory (for you functions and Gelly's code).
>
> If you run this inside an IDE, the memory is configured automatically and you don't have control over that at the moment. You could, however, start a local cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run your programs against that configured cluster. You can do that either through your IDE using a RemoteEnvironment or by submitting the packaged JAR to the local cluster using the command-line tool (./bin/flink).
>
> Hope that helps.
>
> Cheers,
> Max
>
> On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <[hidden email]> wrote:
> Hello,
>  I am working on a problem which implements Adamic Adar Algorithm using Gelly.
> I am running into this exception for all the Joins (including the one that are part of the reduceOnNeighbors function)
>
> Too few memory segments provided. Hash Join needs at least 33 memory segments.
>
>
> The problem persists even when I comment out some of the joins.
>
> Even after using edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());
>
> as suggested by @AndraLungu the problem persists.
>
> The code is
>
>
> DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees();
>
>         //get neighbors of each vertex in the HashSet for it's value
>         computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
>        
>         //get vertices with updated values for the final Graph which will be used to get Adamic Edges
>         Vertices = computedNeighbors.join(degrees, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinNeighborDegrees());
>
>         Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>, Double> updatedGraph =
>                 Graph.fromDataSet(Vertices, edges, env);
>        
>         //configure Vertex Centric Iteration
>         VertexCentricConfiguration parameters = new VertexCentricConfiguration();
>
>         parameters.setName("Find Adamic Adar Edge Weights");
>
>         parameters.setDirection(EdgeDirection.ALL);
>        
>         //run Vertex Centric Iteration to get the Adamic Adar Edges into the vertex Value
>         updatedGraph = updatedGraph.runVertexCentricIteration(new GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters);
>        
>         //Extract Vertices of the updated graph
>         DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices();
>        
>         //Extract the list of Edges from the vertex values
>         DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new GetAdamicList());
>        
>         //Partial weights for the edges are added
>         edg = edg.groupBy(0,1).reduce(new AdamGroup());
>
>         //Graph is updated with the Adamic Adar Edges
>         edg = edg.join(graph.getEdges(), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new JoinEdge());
>
> Any idea how I could tackle this Exception?
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Too few memory segments provided exception

vishnuviswanath
If it might help anyone else, I had a similar issue when running my unit
tests,
I could solve it by increasing memory of sbt

export SBT_OPTS="-Xmx3G -XX:+UseConcMarkSweepGC
-XX:+CMSClassUnloadingEnabled -Xss1G"



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/