Flink hanging between job executions / All Pairs Shortest Paths

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink hanging between job executions / All Pairs Shortest Paths

Mihail Vieru
Hi all,

I've got a problem when running the attached APSPNaiveJob on a graph with just 1000 vertices (local execution; 0.9-SNAPSHOT).
It solves the AllPairsShortestPaths problem the naive way - executing SingleSourceShortestPaths n times - and storing the computed distances in a distance vector for each vertex.

The problem is that Flink almost comes to a standstill when it reaches 20th iteration, i.e. computing SSSP with srcVertexId = 20. The net runtime is becoming increasingly larger than the total runtime by each iteration, Flink hanging between executions.

I didn't have this problem when each vertex didn't contain a distance vector, but just one distance value. It ran SSSP 1000 times without any issues.

The loop:

        while (srcVertexId < numOfVertices) {
            System.out.println("!!! Executing SSSP for srcVertexId = " + srcVertexId);
           
            graph = graph.run(new APSP<Long>(srcVertexId, maxIterations));

            graph.getVertices().print();

            intermediateResult = env.execute("APSPNaive");
            jobRuntime += intermediateResult.getNetRuntime();

            srcVertexId++;
        }


And the program arguments (first being srcVertexId and second numOfVertices used in the loop):

0 30 /home/vieru/dev/flink-experiments/data/social_network.verticeslistwweights-1k2 /home/vieru/dev/flink-experiments/data/social_network.edgelist-1k /home/vieru/dev/flink-experiments/sssp-output-x-higgstwitter 10

Do you know what could cause this problem?

I would greatly appreciate any help.

Best,
Mihail

APSPNaiveJob.java (5K) Download Attachment
APSPData.java (2K) Download Attachment
APSP.java (5K) Download Attachment
social_network.verticeslistwweights-1k2 (2M) Download Attachment
social_network.edgelist-1k (76K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink hanging between job executions / All Pairs Shortest Paths

Stephan Ewen
I think this is a good case where loops in the program can cause issues right now.

The next graph always depends on the previous graph. This is a bit like a recursive definition. In the 10th iteration, in order to execute the print() command, you need to compute the 9th graph, which requires the 8th graph, ...
It is like the inefficient recursive way of computing the Fibonacci Numbers.

The only way go get around that is actually strictly caching the intermediate data set. Flink sill support that internally a few weeks (lets see if it is in time for 0.9, may not). Until then, you need to explicitly persist the graph after each loop iteration.


On Wed, May 13, 2015 at 2:45 PM, Mihail Vieru <[hidden email]> wrote:
Hi all,

I've got a problem when running the attached APSPNaiveJob on a graph with just 1000 vertices (local execution; 0.9-SNAPSHOT).
It solves the AllPairsShortestPaths problem the naive way - executing SingleSourceShortestPaths n times - and storing the computed distances in a distance vector for each vertex.

The problem is that Flink almost comes to a standstill when it reaches 20th iteration, i.e. computing SSSP with srcVertexId = 20. The net runtime is becoming increasingly larger than the total runtime by each iteration, Flink hanging between executions.

I didn't have this problem when each vertex didn't contain a distance vector, but just one distance value. It ran SSSP 1000 times without any issues.

The loop:

        while (srcVertexId < numOfVertices) {
            System.out.println("!!! Executing SSSP for srcVertexId = " + srcVertexId);
           
            graph = graph.run(new APSP<Long>(srcVertexId, maxIterations));

            graph.getVertices().print();

            intermediateResult = env.execute("APSPNaive");
            jobRuntime += intermediateResult.getNetRuntime();

            srcVertexId++;
        }


And the program arguments (first being srcVertexId and second numOfVertices used in the loop):

0 30 /home/vieru/dev/flink-experiments/data/social_network.verticeslistwweights-1k2 /home/vieru/dev/flink-experiments/data/social_network.edgelist-1k /home/vieru/dev/flink-experiments/sssp-output-x-higgstwitter 10

Do you know what could cause this problem?

I would greatly appreciate any help.

Best,
Mihail

Reply | Threaded
Open this post in threaded view
|

Re: Flink hanging between job executions / All Pairs Shortest Paths

Stephan Ewen
BTW, you should be able to see that when, instead of executing the program, you print the execution plan.

I am not sure where the hang comes from. Is it an actual hang, or does it just take long? If it is a hang, does it occur in the optimizer, or in the distributed runtime?


On Wed, May 13, 2015 at 3:00 PM, Stephan Ewen <[hidden email]> wrote:
I think this is a good case where loops in the program can cause issues right now.

The next graph always depends on the previous graph. This is a bit like a recursive definition. In the 10th iteration, in order to execute the print() command, you need to compute the 9th graph, which requires the 8th graph, ...
It is like the inefficient recursive way of computing the Fibonacci Numbers.

The only way go get around that is actually strictly caching the intermediate data set. Flink sill support that internally a few weeks (lets see if it is in time for 0.9, may not). Until then, you need to explicitly persist the graph after each loop iteration.


On Wed, May 13, 2015 at 2:45 PM, Mihail Vieru <[hidden email]> wrote:
Hi all,

I've got a problem when running the attached APSPNaiveJob on a graph with just 1000 vertices (local execution; 0.9-SNAPSHOT).
It solves the AllPairsShortestPaths problem the naive way - executing SingleSourceShortestPaths n times - and storing the computed distances in a distance vector for each vertex.

The problem is that Flink almost comes to a standstill when it reaches 20th iteration, i.e. computing SSSP with srcVertexId = 20. The net runtime is becoming increasingly larger than the total runtime by each iteration, Flink hanging between executions.

I didn't have this problem when each vertex didn't contain a distance vector, but just one distance value. It ran SSSP 1000 times without any issues.

The loop:

        while (srcVertexId < numOfVertices) {
            System.out.println("!!! Executing SSSP for srcVertexId = " + srcVertexId);
           
            graph = graph.run(new APSP<Long>(srcVertexId, maxIterations));

            graph.getVertices().print();

            intermediateResult = env.execute("APSPNaive");
            jobRuntime += intermediateResult.getNetRuntime();

            srcVertexId++;
        }


And the program arguments (first being srcVertexId and second numOfVertices used in the loop):

0 30 /home/vieru/dev/flink-experiments/data/social_network.verticeslistwweights-1k2 /home/vieru/dev/flink-experiments/data/social_network.edgelist-1k /home/vieru/dev/flink-experiments/sssp-output-x-higgstwitter 10

Do you know what could cause this problem?

I would greatly appreciate any help.

Best,
Mihail


Reply | Threaded
Open this post in threaded view
|

Re: Flink hanging between job executions / All Pairs Shortest Paths

Mihail Vieru
Thanks for the quick answer! I will try the explicit persistence workaround.

Can you give me a more precise estimate for the internal caching support? Will it take more than 2 weeks?

It isn't an actual hang. It waits a lot before starting the next iteration after the 20th or so iteration.

Cheers,
Mihail

On 13.05.2015 15:01, Stephan Ewen wrote:
BTW, you should be able to see that when, instead of executing the program, you print the execution plan.

I am not sure where the hang comes from. Is it an actual hang, or does it just take long? If it is a hang, does it occur in the optimizer, or in the distributed runtime?


On Wed, May 13, 2015 at 3:00 PM, Stephan Ewen <[hidden email]> wrote:
I think this is a good case where loops in the program can cause issues right now.

The next graph always depends on the previous graph. This is a bit like a recursive definition. In the 10th iteration, in order to execute the print() command, you need to compute the 9th graph, which requires the 8th graph, ...
It is like the inefficient recursive way of computing the Fibonacci Numbers.

The only way go get around that is actually strictly caching the intermediate data set. Flink sill support that internally a few weeks (lets see if it is in time for 0.9, may not). Until then, you need to explicitly persist the graph after each loop iteration.


On Wed, May 13, 2015 at 2:45 PM, Mihail Vieru <[hidden email]> wrote:
Hi all,

I've got a problem when running the attached APSPNaiveJob on a graph with just 1000 vertices (local execution; 0.9-SNAPSHOT).
It solves the AllPairsShortestPaths problem the naive way - executing SingleSourceShortestPaths n times - and storing the computed distances in a distance vector for each vertex.

The problem is that Flink almost comes to a standstill when it reaches 20th iteration, i.e. computing SSSP with srcVertexId = 20. The net runtime is becoming increasingly larger than the total runtime by each iteration, Flink hanging between executions.

I didn't have this problem when each vertex didn't contain a distance vector, but just one distance value. It ran SSSP 1000 times without any issues.

The loop:

        while (srcVertexId < numOfVertices) {
            System.out.println("!!! Executing SSSP for srcVertexId = " + srcVertexId);
           
            graph = graph.run(new APSP<Long>(srcVertexId, maxIterations));

            graph.getVertices().print();

            intermediateResult = env.execute("APSPNaive");
            jobRuntime += intermediateResult.getNetRuntime();

            srcVertexId++;
        }


And the program arguments (first being srcVertexId and second numOfVertices used in the loop):

0 30 /home/vieru/dev/flink-experiments/data/social_network.verticeslistwweights-1k2 /home/vieru/dev/flink-experiments/data/social_network.edgelist-1k /home/vieru/dev/flink-experiments/sssp-output-x-higgstwitter 10

Do you know what could cause this problem?

I would greatly appreciate any help.

Best,
Mihail