How to perform efficient DataSet reuse between iterations

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

How to perform efficient DataSet reuse between iterations

Miguel Coimbra
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra

Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Fabian Hueske-2
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Miguel Coimbra
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra



Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Fabian Hueske-2
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra




Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Miguel Coimbra
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra





Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Fabian Hueske-2
The monitoring REST interface provides detailed stats about a job, its tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For example the records that are produced by a GroupReduce can be processed by a Map, shuffled, and sorted (for another GroupReduce) in a pipelined fashion.
Hence, all these operations run at the same time. You can disable this behavior to some extend by setting the execution mode to batched shuffles [2].
However, this will likely have a negative impact on the overall execution time.

2017-11-29 0:44 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra






Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Miguel Coimbra
Hello Fabian,

Thanks for the help.
I am interested in the duration of specific operators, so the fact that parts of the execution are in pipeline is not a problem for me.
From my understanding, the automated way to approach this is to run the Flink job with the web interface active and then make a REST call on the appropriate job and parse the JSON.
I have implemented that and my program is able to read the duration of specific operators.

However, I'm facing another problem whose cause I haven't been able to pinpoint.

Testing on 1.4.0-SNAPSHOT.

When launching a cluster (start-local.bat on Windows or start-cluster.sh on Linux), the JobManager, the TaskManager(s) are launched and the web front end becomes active (I can access it via browser) - everything is ok so far.
The problem occurs when the number of operators in the plan increases.

Consider that I execute my algorithm three (3) times through a single Flink plan.
For three times, vertices and edges will be added to the graph (via Gelly methods).
This is defined in a for loop in my program. For each iteration:

// I add 100 edges to the graph, decomposed in a list of vertices and edges
final Graph<Long, NullValue, NullValue> newGraph = graph
    .addVertices(verticesToAdd)
    .addEdges(edgesToAdd);
                   
// Generate identifications for the vertex counter.
final String vid = new AbstractID().toString();
newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, NullValue>>(vid)).name("count()");
vids.put(executionCounter, vid);

// Generate identifications for the edge counter.
final String eid = new AbstractID().toString();
newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, NullValue>>(eid)).name("count()");
eids.put(executionCounter, eid);


So far I have created 2 sinks in the current iteration in my regular Java program.
Then I execute my graph algorithm with the previous results:

// Execute the graph algorithm and output some of its results
result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store results");
previousResults = result;

This creates one additional sink via the output function.
So for three (3) iterations, I define nine (9) sinks in the plan, call execute() and afterward retrieve the contents of the sinks.
This works fine so far.

If I run with 10 iterations, I will be creating 30 sinks.
The problem is that for 10 iterations, the Flink client program just hangs on the execute() call forever (execution time should increase linearly with the number of iterations).
For 3 iterations, execute() proceeds normally and it takes around 20 seconds per iteration, so 3 iterations is 60 seconds and 10 should be around 3 minutes.
After five hours, there was no progress.

Furthermore, I checked the web monitor periodically and there was not a single job.
It seems that the client program is simply not sending the job to the cluster if the job plan becomes too big.
The exact same compiled program, with 3 iterations (via argument) works, but with 10 (via argument) it simply falls silent.

I am trying to understand what may be the problem:

- An internal limit in the number of datasinks or operators in the plan?

- A limit in message size preventing the client from sending the job?
I have tried increasing the akka.framesize to 256000kB in the Flink server flink-conf.yaml config and in the client program when creating the remote environment with:

Configuration clientConfig = new Configuration();
final String akkaFrameSize = "256000kB";
ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize);
clientConfig.setString(akkaConfig, akkaFrameSize);
env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, jarFiles, null);


I have run out of ideas with respect to the causes.
Hoping you may be able to shed some light.

Thank you for your time,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 29 November 2017 at 10:23, Fabian Hueske <[hidden email]> wrote:
The monitoring REST interface provides detailed stats about a job, its tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For example the records that are produced by a GroupReduce can be processed by a Map, shuffled, and sorted (for another GroupReduce) in a pipelined fashion.
Hence, all these operations run at the same time. You can disable this behavior to some extend by setting the execution mode to batched shuffles [2].
However, this will likely have a negative impact on the overall execution time.

2017-11-29 0:44 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra







Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Fabian Hueske-2
Hi Miguel,

if the message size would be the problem, the client should fail with an exception.
What might happen, is that the client gets stuck while optimizing the program.

You could take a stacktrace of the client process to identify at which part the client gets stuck.

Best, Fabian

2017-12-06 3:01 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the help.
I am interested in the duration of specific operators, so the fact that parts of the execution are in pipeline is not a problem for me.
From my understanding, the automated way to approach this is to run the Flink job with the web interface active and then make a REST call on the appropriate job and parse the JSON.
I have implemented that and my program is able to read the duration of specific operators.

However, I'm facing another problem whose cause I haven't been able to pinpoint.

Testing on 1.4.0-SNAPSHOT.

When launching a cluster (start-local.bat on Windows or start-cluster.sh on Linux), the JobManager, the TaskManager(s) are launched and the web front end becomes active (I can access it via browser) - everything is ok so far.
The problem occurs when the number of operators in the plan increases.

Consider that I execute my algorithm three (3) times through a single Flink plan.
For three times, vertices and edges will be added to the graph (via Gelly methods).
This is defined in a for loop in my program. For each iteration:

// I add 100 edges to the graph, decomposed in a list of vertices and edges
final Graph<Long, NullValue, NullValue> newGraph = graph
    .addVertices(verticesToAdd)
    .addEdges(edgesToAdd);
                   
// Generate identifications for the vertex counter.
final String vid = new AbstractID().toString();
newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, NullValue>>(vid)).name("count()");
vids.put(executionCounter, vid);

// Generate identifications for the edge counter.
final String eid = new AbstractID().toString();
newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, NullValue>>(eid)).name("count()");
eids.put(executionCounter, eid);


So far I have created 2 sinks in the current iteration in my regular Java program.
Then I execute my graph algorithm with the previous results:

// Execute the graph algorithm and output some of its results
result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store results");
previousResults = result;

This creates one additional sink via the output function.
So for three (3) iterations, I define nine (9) sinks in the plan, call execute() and afterward retrieve the contents of the sinks.
This works fine so far.

If I run with 10 iterations, I will be creating 30 sinks.
The problem is that for 10 iterations, the Flink client program just hangs on the execute() call forever (execution time should increase linearly with the number of iterations).
For 3 iterations, execute() proceeds normally and it takes around 20 seconds per iteration, so 3 iterations is 60 seconds and 10 should be around 3 minutes.
After five hours, there was no progress.

Furthermore, I checked the web monitor periodically and there was not a single job.
It seems that the client program is simply not sending the job to the cluster if the job plan becomes too big.
The exact same compiled program, with 3 iterations (via argument) works, but with 10 (via argument) it simply falls silent.

I am trying to understand what may be the problem:

- An internal limit in the number of datasinks or operators in the plan?

- A limit in message size preventing the client from sending the job?
I have tried increasing the akka.framesize to 256000kB in the Flink server flink-conf.yaml config and in the client program when creating the remote environment with:

Configuration clientConfig = new Configuration();
final String akkaFrameSize = "256000kB";
ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize);
clientConfig.setString(akkaConfig, akkaFrameSize);
env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, jarFiles, null);


I have run out of ideas with respect to the causes.
Hoping you may be able to shed some light.

Thank you for your time,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 29 November 2017 at 10:23, Fabian Hueske <[hidden email]> wrote:
The monitoring REST interface provides detailed stats about a job, its tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For example the records that are produced by a GroupReduce can be processed by a Map, shuffled, and sorted (for another GroupReduce) in a pipelined fashion.
Hence, all these operations run at the same time. You can disable this behavior to some extend by setting the execution mode to batched shuffles [2].
However, this will likely have a negative impact on the overall execution time.

2017-11-29 0:44 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra








Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Miguel Coimbra
Hello Fabian,

After increasing the message size akka parameter, the client resulted in the following exception after some time.
This confirms that the JobManager never received the job request:

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.flink.optimizer.costs.CostEstimator.costOperator(CostEstimator.java:78)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:516)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:344)
        at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:193)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:496)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:349)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:812)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
        at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
        at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
        at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.optimizedAccumulationLoop(GraphSequenceTest.java:304)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.main(GraphSequenceTest.java:204)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


I hope I am not hitting a formal limit of Flink?

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2017 at 08:48, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

if the message size would be the problem, the client should fail with an exception.
What might happen, is that the client gets stuck while optimizing the program.

You could take a stacktrace of the client process to identify at which part the client gets stuck.

Best, Fabian

2017-12-06 3:01 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the help.
I am interested in the duration of specific operators, so the fact that parts of the execution are in pipeline is not a problem for me.
From my understanding, the automated way to approach this is to run the Flink job with the web interface active and then make a REST call on the appropriate job and parse the JSON.
I have implemented that and my program is able to read the duration of specific operators.

However, I'm facing another problem whose cause I haven't been able to pinpoint.

Testing on 1.4.0-SNAPSHOT.

When launching a cluster (start-local.bat on Windows or start-cluster.sh on Linux), the JobManager, the TaskManager(s) are launched and the web front end becomes active (I can access it via browser) - everything is ok so far.
The problem occurs when the number of operators in the plan increases.

Consider that I execute my algorithm three (3) times through a single Flink plan.
For three times, vertices and edges will be added to the graph (via Gelly methods).
This is defined in a for loop in my program. For each iteration:

// I add 100 edges to the graph, decomposed in a list of vertices and edges
final Graph<Long, NullValue, NullValue> newGraph = graph
    .addVertices(verticesToAdd)
    .addEdges(edgesToAdd);
                   
// Generate identifications for the vertex counter.
final String vid = new AbstractID().toString();
newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, NullValue>>(vid)).name("count()");
vids.put(executionCounter, vid);

// Generate identifications for the edge counter.
final String eid = new AbstractID().toString();
newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, NullValue>>(eid)).name("count()");
eids.put(executionCounter, eid);


So far I have created 2 sinks in the current iteration in my regular Java program.
Then I execute my graph algorithm with the previous results:

// Execute the graph algorithm and output some of its results
result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store results");
previousResults = result;

This creates one additional sink via the output function.
So for three (3) iterations, I define nine (9) sinks in the plan, call execute() and afterward retrieve the contents of the sinks.
This works fine so far.

If I run with 10 iterations, I will be creating 30 sinks.
The problem is that for 10 iterations, the Flink client program just hangs on the execute() call forever (execution time should increase linearly with the number of iterations).
For 3 iterations, execute() proceeds normally and it takes around 20 seconds per iteration, so 3 iterations is 60 seconds and 10 should be around 3 minutes.
After five hours, there was no progress.

Furthermore, I checked the web monitor periodically and there was not a single job.
It seems that the client program is simply not sending the job to the cluster if the job plan becomes too big.
The exact same compiled program, with 3 iterations (via argument) works, but with 10 (via argument) it simply falls silent.

I am trying to understand what may be the problem:

- An internal limit in the number of datasinks or operators in the plan?

- A limit in message size preventing the client from sending the job?
I have tried increasing the akka.framesize to 256000kB in the Flink server flink-conf.yaml config and in the client program when creating the remote environment with:

Configuration clientConfig = new Configuration();
final String akkaFrameSize = "256000kB";
ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize);
clientConfig.setString(akkaConfig, akkaFrameSize);
env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, jarFiles, null);


I have run out of ideas with respect to the causes.
Hoping you may be able to shed some light.

Thank you for your time,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 29 November 2017 at 10:23, Fabian Hueske <[hidden email]> wrote:
The monitoring REST interface provides detailed stats about a job, its tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For example the records that are produced by a GroupReduce can be processed by a Map, shuffled, and sorted (for another GroupReduce) in a pipelined fashion.
Hence, all these operations run at the same time. You can disable this behavior to some extend by setting the execution mode to batched shuffles [2].
However, this will likely have a negative impact on the overall execution time.

2017-11-29 0:44 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra









Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Fabian Hueske-2
Hmm, this does not look too good.
As I expected, the program gets stuck in the optimizer. Plan optimization can be quite expensive for large plans.

There might be a way to improve the optimization of large plans by cutting the plan space but I would not expect this to be fixed in the near future.
Touching the optimizer is a delicate issue and requires a lot of care and effort.

I would try to increase the heap size of the client JVM (check the /bin/flink file which starts the client JVM).
This should bring down the GC overhead, but the computational complexity of enumerating plans would remain the same.
You might want to have a look at semantic annotations [1]. Adding these to your user functions should have an effect on the plan enumeration.

If this doesn't help, the only solution might be to cut the program into multiple pieces and spill intermediate results to disk.

Best, Fabian


2017-12-06 11:10 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

After increasing the message size akka parameter, the client resulted in the following exception after some time.
This confirms that the JobManager never received the job request:

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.flink.optimizer.costs.CostEstimator.costOperator(CostEstimator.java:78)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:516)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:344)
        at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:193)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:496)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:349)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:812)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
        at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
        at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
        at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.optimizedAccumulationLoop(GraphSequenceTest.java:304)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.main(GraphSequenceTest.java:204)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


I hope I am not hitting a formal limit of Flink?

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2017 at 08:48, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

if the message size would be the problem, the client should fail with an exception.
What might happen, is that the client gets stuck while optimizing the program.

You could take a stacktrace of the client process to identify at which part the client gets stuck.

Best, Fabian

2017-12-06 3:01 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the help.
I am interested in the duration of specific operators, so the fact that parts of the execution are in pipeline is not a problem for me.
From my understanding, the automated way to approach this is to run the Flink job with the web interface active and then make a REST call on the appropriate job and parse the JSON.
I have implemented that and my program is able to read the duration of specific operators.

However, I'm facing another problem whose cause I haven't been able to pinpoint.

Testing on 1.4.0-SNAPSHOT.

When launching a cluster (start-local.bat on Windows or start-cluster.sh on Linux), the JobManager, the TaskManager(s) are launched and the web front end becomes active (I can access it via browser) - everything is ok so far.
The problem occurs when the number of operators in the plan increases.

Consider that I execute my algorithm three (3) times through a single Flink plan.
For three times, vertices and edges will be added to the graph (via Gelly methods).
This is defined in a for loop in my program. For each iteration:

// I add 100 edges to the graph, decomposed in a list of vertices and edges
final Graph<Long, NullValue, NullValue> newGraph = graph
    .addVertices(verticesToAdd)
    .addEdges(edgesToAdd);
                   
// Generate identifications for the vertex counter.
final String vid = new AbstractID().toString();
newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, NullValue>>(vid)).name("count()");
vids.put(executionCounter, vid);

// Generate identifications for the edge counter.
final String eid = new AbstractID().toString();
newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, NullValue>>(eid)).name("count()");
eids.put(executionCounter, eid);


So far I have created 2 sinks in the current iteration in my regular Java program.
Then I execute my graph algorithm with the previous results:

// Execute the graph algorithm and output some of its results
result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store results");
previousResults = result;

This creates one additional sink via the output function.
So for three (3) iterations, I define nine (9) sinks in the plan, call execute() and afterward retrieve the contents of the sinks.
This works fine so far.

If I run with 10 iterations, I will be creating 30 sinks.
The problem is that for 10 iterations, the Flink client program just hangs on the execute() call forever (execution time should increase linearly with the number of iterations).
For 3 iterations, execute() proceeds normally and it takes around 20 seconds per iteration, so 3 iterations is 60 seconds and 10 should be around 3 minutes.
After five hours, there was no progress.

Furthermore, I checked the web monitor periodically and there was not a single job.
It seems that the client program is simply not sending the job to the cluster if the job plan becomes too big.
The exact same compiled program, with 3 iterations (via argument) works, but with 10 (via argument) it simply falls silent.

I am trying to understand what may be the problem:

- An internal limit in the number of datasinks or operators in the plan?

- A limit in message size preventing the client from sending the job?
I have tried increasing the akka.framesize to 256000kB in the Flink server flink-conf.yaml config and in the client program when creating the remote environment with:

Configuration clientConfig = new Configuration();
final String akkaFrameSize = "256000kB";
ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize);
clientConfig.setString(akkaConfig, akkaFrameSize);
env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, jarFiles, null);


I have run out of ideas with respect to the causes.
Hoping you may be able to shed some light.

Thank you for your time,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 29 November 2017 at 10:23, Fabian Hueske <[hidden email]> wrote:
The monitoring REST interface provides detailed stats about a job, its tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For example the records that are produced by a GroupReduce can be processed by a Map, shuffled, and sorted (for another GroupReduce) in a pipelined fashion.
Hence, all these operations run at the same time. You can disable this behavior to some extend by setting the execution mode to batched shuffles [2].
However, this will likely have a negative impact on the overall execution time.

2017-11-29 0:44 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra










Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Miguel Coimbra
Hello Fabian,

It really looks like an issue requiring attention.

Since I am launching the client application via Maven, I opted to change the maximum memory setting with
export MAVEN_OPTS="-Xms256m -Xmx40000m".
To give an example, for three (3) iterations it worked fine with around 4 GB of memory, but for ten (10), launching a client with a limit of 40 GB of memory still resulted in the same exception.
The client application did consume memory into the 30-40 GB range before running out of memory.

Since I am working with very big datasets, spilling to disk becomes impractical with respect to conducting experiments.
A dataset of thirty (30) GB would take a lot of time if we were to spill to disk a hundred times.

That is why I was asking about a caching operator or spilling intermediate results in a BulkIteration.
You said the caching operator would not be trivial to implement.
However, would it also be very hard to allow for spilling in Flink's BulkIteration or DeltaIteration?

I am not sure I can use semantic annotations because the program just adds vertices and edges to the graph (Gelly API) and runs an algorithm implementing the GraphAlgorithm interface with ScatterFunction and GatherFunction class extensions.
There are some join functions though, I will look into applying them.

Besides this, can you recommend an initial place in the code where one should look to begin studying the optimizer?

Thanks for your time once more,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2017 at 10:27, Fabian Hueske <[hidden email]> wrote:
Hmm, this does not look too good.
As I expected, the program gets stuck in the optimizer. Plan optimization can be quite expensive for large plans.

There might be a way to improve the optimization of large plans by cutting the plan space but I would not expect this to be fixed in the near future.
Touching the optimizer is a delicate issue and requires a lot of care and effort.

I would try to increase the heap size of the client JVM (check the /bin/flink file which starts the client JVM).
This should bring down the GC overhead, but the computational complexity of enumerating plans would remain the same.
You might want to have a look at semantic annotations [1]. Adding these to your user functions should have an effect on the plan enumeration.

If this doesn't help, the only solution might be to cut the program into multiple pieces and spill intermediate results to disk.

Best, Fabian


2017-12-06 11:10 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

After increasing the message size akka parameter, the client resulted in the following exception after some time.
This confirms that the JobManager never received the job request:

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.flink.optimizer.costs.CostEstimator.costOperator(CostEstimator.java:78)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:516)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:344)
        at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:193)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:496)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:349)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:812)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
        at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
        at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
        at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.optimizedAccumulationLoop(GraphSequenceTest.java:304)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.main(GraphSequenceTest.java:204)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


I hope I am not hitting a formal limit of Flink?

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2017 at 08:48, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

if the message size would be the problem, the client should fail with an exception.
What might happen, is that the client gets stuck while optimizing the program.

You could take a stacktrace of the client process to identify at which part the client gets stuck.

Best, Fabian

2017-12-06 3:01 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the help.
I am interested in the duration of specific operators, so the fact that parts of the execution are in pipeline is not a problem for me.
From my understanding, the automated way to approach this is to run the Flink job with the web interface active and then make a REST call on the appropriate job and parse the JSON.
I have implemented that and my program is able to read the duration of specific operators.

However, I'm facing another problem whose cause I haven't been able to pinpoint.

Testing on 1.4.0-SNAPSHOT.

When launching a cluster (start-local.bat on Windows or start-cluster.sh on Linux), the JobManager, the TaskManager(s) are launched and the web front end becomes active (I can access it via browser) - everything is ok so far.
The problem occurs when the number of operators in the plan increases.

Consider that I execute my algorithm three (3) times through a single Flink plan.
For three times, vertices and edges will be added to the graph (via Gelly methods).
This is defined in a for loop in my program. For each iteration:

// I add 100 edges to the graph, decomposed in a list of vertices and edges
final Graph<Long, NullValue, NullValue> newGraph = graph
    .addVertices(verticesToAdd)
    .addEdges(edgesToAdd);
                   
// Generate identifications for the vertex counter.
final String vid = new AbstractID().toString();
newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, NullValue>>(vid)).name("count()");
vids.put(executionCounter, vid);

// Generate identifications for the edge counter.
final String eid = new AbstractID().toString();
newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, NullValue>>(eid)).name("count()");
eids.put(executionCounter, eid);


So far I have created 2 sinks in the current iteration in my regular Java program.
Then I execute my graph algorithm with the previous results:

// Execute the graph algorithm and output some of its results
result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store results");
previousResults = result;

This creates one additional sink via the output function.
So for three (3) iterations, I define nine (9) sinks in the plan, call execute() and afterward retrieve the contents of the sinks.
This works fine so far.

If I run with 10 iterations, I will be creating 30 sinks.
The problem is that for 10 iterations, the Flink client program just hangs on the execute() call forever (execution time should increase linearly with the number of iterations).
For 3 iterations, execute() proceeds normally and it takes around 20 seconds per iteration, so 3 iterations is 60 seconds and 10 should be around 3 minutes.
After five hours, there was no progress.

Furthermore, I checked the web monitor periodically and there was not a single job.
It seems that the client program is simply not sending the job to the cluster if the job plan becomes too big.
The exact same compiled program, with 3 iterations (via argument) works, but with 10 (via argument) it simply falls silent.

I am trying to understand what may be the problem:

- An internal limit in the number of datasinks or operators in the plan?

- A limit in message size preventing the client from sending the job?
I have tried increasing the akka.framesize to 256000kB in the Flink server flink-conf.yaml config and in the client program when creating the remote environment with:

Configuration clientConfig = new Configuration();
final String akkaFrameSize = "256000kB";
ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize);
clientConfig.setString(akkaConfig, akkaFrameSize);
env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, jarFiles, null);


I have run out of ideas with respect to the causes.
Hoping you may be able to shed some light.

Thank you for your time,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 29 November 2017 at 10:23, Fabian Hueske <[hidden email]> wrote:
The monitoring REST interface provides detailed stats about a job, its tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For example the records that are produced by a GroupReduce can be processed by a Map, shuffled, and sorted (for another GroupReduce) in a pipelined fashion.
Hence, all these operations run at the same time. You can disable this behavior to some extend by setting the execution mode to batched shuffles [2].
However, this will likely have a negative impact on the overall execution time.

2017-11-29 0:44 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra











Reply | Threaded
Open this post in threaded view
|

Re: How to perform efficient DataSet reuse between iterations

Fabian Hueske-2
Hi Miguel,

I just had another idea that you could try. 

The problem seems to be that the plan space that the optimizer enumerates exponentially grows as you add more iterations.
This happens when there are multiple valid execution strategies for a given operator (mostly applied to Joins).
You could try to fix the strategies with optimizer hints [1] and thereby reduce the degrees of freedoms for the optimizer. With fewer degrees of freedom, the optimizer enumerates fewer plans.

I don't know if Gelly allows to set optimizer hints. If not, you should fork the code and add the hints yourself.

If you want to go down the route of touching the optimizer, I won't be able to help.
The optimizer is a pretty complex component and I would have to invest too much time for that.

Best, Fabian



2017-12-07 15:40 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

It really looks like an issue requiring attention.

Since I am launching the client application via Maven, I opted to change the maximum memory setting with
export MAVEN_OPTS="-Xms256m -Xmx40000m".
To give an example, for three (3) iterations it worked fine with around 4 GB of memory, but for ten (10), launching a client with a limit of 40 GB of memory still resulted in the same exception.
The client application did consume memory into the 30-40 GB range before running out of memory.

Since I am working with very big datasets, spilling to disk becomes impractical with respect to conducting experiments.
A dataset of thirty (30) GB would take a lot of time if we were to spill to disk a hundred times.

That is why I was asking about a caching operator or spilling intermediate results in a BulkIteration.
You said the caching operator would not be trivial to implement.
However, would it also be very hard to allow for spilling in Flink's BulkIteration or DeltaIteration?

I am not sure I can use semantic annotations because the program just adds vertices and edges to the graph (Gelly API) and runs an algorithm implementing the GraphAlgorithm interface with ScatterFunction and GatherFunction class extensions.
There are some join functions though, I will look into applying them.

Besides this, can you recommend an initial place in the code where one should look to begin studying the optimizer?

Thanks for your time once more,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2017 at 10:27, Fabian Hueske <[hidden email]> wrote:
Hmm, this does not look too good.
As I expected, the program gets stuck in the optimizer. Plan optimization can be quite expensive for large plans.

There might be a way to improve the optimization of large plans by cutting the plan space but I would not expect this to be fixed in the near future.
Touching the optimizer is a delicate issue and requires a lot of care and effort.

I would try to increase the heap size of the client JVM (check the /bin/flink file which starts the client JVM).
This should bring down the GC overhead, but the computational complexity of enumerating plans would remain the same.
You might want to have a look at semantic annotations [1]. Adding these to your user functions should have an effect on the plan enumeration.

If this doesn't help, the only solution might be to cut the program into multiple pieces and spill intermediate results to disk.

Best, Fabian


2017-12-06 11:10 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

After increasing the message size akka parameter, the client resulted in the following exception after some time.
This confirms that the JobManager never received the job request:

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.flink.optimizer.costs.CostEstimator.costOperator(CostEstimator.java:78)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:516)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:344)
        at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:193)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:496)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:349)
        at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:812)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
        at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
        at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
        at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.optimizedAccumulationLoop(GraphSequenceTest.java:304)
        at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.main(GraphSequenceTest.java:204)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


I hope I am not hitting a formal limit of Flink?

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 6 December 2017 at 08:48, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

if the message size would be the problem, the client should fail with an exception.
What might happen, is that the client gets stuck while optimizing the program.

You could take a stacktrace of the client process to identify at which part the client gets stuck.

Best, Fabian

2017-12-06 3:01 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thanks for the help.
I am interested in the duration of specific operators, so the fact that parts of the execution are in pipeline is not a problem for me.
From my understanding, the automated way to approach this is to run the Flink job with the web interface active and then make a REST call on the appropriate job and parse the JSON.
I have implemented that and my program is able to read the duration of specific operators.

However, I'm facing another problem whose cause I haven't been able to pinpoint.

Testing on 1.4.0-SNAPSHOT.

When launching a cluster (start-local.bat on Windows or start-cluster.sh on Linux), the JobManager, the TaskManager(s) are launched and the web front end becomes active (I can access it via browser) - everything is ok so far.
The problem occurs when the number of operators in the plan increases.

Consider that I execute my algorithm three (3) times through a single Flink plan.
For three times, vertices and edges will be added to the graph (via Gelly methods).
This is defined in a for loop in my program. For each iteration:

// I add 100 edges to the graph, decomposed in a list of vertices and edges
final Graph<Long, NullValue, NullValue> newGraph = graph
    .addVertices(verticesToAdd)
    .addEdges(edgesToAdd);
                   
// Generate identifications for the vertex counter.
final String vid = new AbstractID().toString();
newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, NullValue>>(vid)).name("count()");
vids.put(executionCounter, vid);

// Generate identifications for the edge counter.
final String eid = new AbstractID().toString();
newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, NullValue>>(eid)).name("count()");
eids.put(executionCounter, eid);


So far I have created 2 sinks in the current iteration in my regular Java program.
Then I execute my graph algorithm with the previous results:

// Execute the graph algorithm and output some of its results
result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).output(outputFormat).name("Store results");
previousResults = result;

This creates one additional sink via the output function.
So for three (3) iterations, I define nine (9) sinks in the plan, call execute() and afterward retrieve the contents of the sinks.
This works fine so far.

If I run with 10 iterations, I will be creating 30 sinks.
The problem is that for 10 iterations, the Flink client program just hangs on the execute() call forever (execution time should increase linearly with the number of iterations).
For 3 iterations, execute() proceeds normally and it takes around 20 seconds per iteration, so 3 iterations is 60 seconds and 10 should be around 3 minutes.
After five hours, there was no progress.

Furthermore, I checked the web monitor periodically and there was not a single job.
It seems that the client program is simply not sending the job to the cluster if the job plan becomes too big.
The exact same compiled program, with 3 iterations (via argument) works, but with 10 (via argument) it simply falls silent.

I am trying to understand what may be the problem:

- An internal limit in the number of datasinks or operators in the plan?

- A limit in message size preventing the client from sending the job?
I have tried increasing the akka.framesize to 256000kB in the Flink server flink-conf.yaml config and in the client program when creating the remote environment with:

Configuration clientConfig = new Configuration();
final String akkaFrameSize = "256000kB";
ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.FRAMESIZE.key()).defaultValue(akkaFrameSize);
clientConfig.setString(akkaConfig, akkaFrameSize);
env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, jarFiles, null);


I have run out of ideas with respect to the causes.
Hoping you may be able to shed some light.

Thank you for your time,

Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 29 November 2017 at 10:23, Fabian Hueske <[hidden email]> wrote:
The monitoring REST interface provides detailed stats about a job, its tasks, and processing verticies including their start and end time [1].

However, it is not trivial to make sense of the execution times because Flink uses pipelined shuffles by default.
That means that the execution of multiple operators can overlap. For example the records that are produced by a GroupReduce can be processed by a Map, shuffled, and sorted (for another GroupReduce) in a pipelined fashion.
Hence, all these operations run at the same time. You can disable this behavior to some extend by setting the execution mode to batched shuffles [2].
However, this will likely have a negative impact on the overall execution time.

2017-11-29 0:44 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single job takes to execute, but what if I want to know the time taken for specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the total operator execution time;
b) know the time taken by each parallel component of the operator's execution so I could know where and what was the "lagging element" in the operator's execution.

Is this possible? I was hoping I could retrieve this information in the Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 28 November 2017 at 08:56, Fabian Hueske <[hidden email]> wrote:
Hi,

by calling result.count(), you compute the complete plan from the beginning and not just the operations you added since the last execution.
Looking at the output you posted, each step takes about 15 seconds (with about 5 secs of initialization).
So the 20 seconds of the first step include initialization + 1st step.
The 35 seconds on the second step include initialization, 1st step + 2nd step.
If you don't call count on the intermediate steps, you can compute the 4th step in 65 seconds.

Implementing a caching operator would be a pretty huge effort because you need to touch code at many places such as the API, optimizer, runtime, scheduling, etc.
The documentation you found should still be applicable. There hasn't been major additions to the DataSet API and runtime in the last releases.

Best, Fabian



2017-11-28 9:14 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is being called implicitly by simple DataSink elements added to the plan through count():

System.out.println(String.format("%d-th graph algorithm produced %d elements. (%d.%d s).",
                            executionCounter,
                            result.count(), // this would trigger execution...
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS),
                            env.getLastJobExecutionResult().getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is processed with classes such as  org.apache.flink.optimizer.traversals.GraphCreatingVisitor, org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing (such as adding custom operators) but it was associated to an old version of Flink:
However, as far as I know there is no equivalent page in the current online stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always representing an increasing sequence of executions and not just the results of the last execution.



Best regards,

Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra


On 27 November 2017 at 22:56, Fabian Hueske <[hidden email]> wrote:
Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <[hidden email]>:
Hello,

I'm facing a problem in an algorithm where I would like to constantly update a DataSet representing a graph, perform some computation, output one or more DataSink (such as a file on the local system) and then reuse the DataSet for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration and to read it back in the next iterations - the graph is very big and I do not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one and I want to save to disk a small percentage of the produced DataSet for each iteration.
The space complexity is rather constant - the number of edges in the graph increases by only 100 between iterations (which is an extremely low percentage of the original graph's edges) and is obtained using env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion [1]:

«​
Unfortunately, it is not currently possible to output intermediate results from a bulk iteration.
You can only output the final result at the end of the iteration.
Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.»


1. I thought I could create a bulk iteration, perform the computation and between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the following exception on execution when I try (for example, to calculate a centrality metric for every vertex and dump the results to disk), as expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

2. Using a for loop in my own program and triggering sequential Flink job executions.
Problem: in this scenario, while I am able to use a DataSet produced in an iteration's Flink job (and dump the desired output information to disk) and pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with env.startNewSession() before the loop - no impact)

​​
Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. (20.96 s).
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th
graph algorithm produced 33536 elements. (35.913 s).
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th
graph algorithm produced 33543 elements. (49.624 s).
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th
graph algorithm produced 33557 elements. (66.209 s).

Note that the number of elements in the output DataSet is equal to the number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the result DataSet of iteration i - 1 by means of the g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements VertexJoinFunction<Double, Double> {
    @Override
    public Double vertexJoin(final Double vertexValue, final Double inputValue) throws Exception {
        return inputValue;
    }
}


​I have also used Flink's plan visualizer to check for discrepancies between the first iteration and the tenth (for example), but the layout of the plan remains exactly the same while the execution time continually increases for what should be the same amount of computations.

Bottom-line: ​I was hoping someone could tell me how to overcome the performance bottleneck using the sequential job approach or enabling the output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);


​I wish to ​execute in a cluster later on with a bigger dataset, so it would be essential that to maximize the ability to reuse the DataSets that are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:​




Miguel E. Coimbra
Email: [hidden email]
Skype: miguel.e.coimbra