intermediate result reuse

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

intermediate result reuse

Michele Bertoni
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele
Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Fabian Hueske-2
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele

Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Stephan Ewen
Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele


Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Michele Bertoni
ok, I think I got the point: I don’t have two execute but a collect in some branch
I will look for a way to remove it


What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small
Is it better to collect or a cogroup?


btw is something you expect to solve i further versions?


thanks
michele




Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <[hidden email]> ha scritto:

Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele



Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Stephan Ewen
Hi!

In most places where you use collect(), you should be able to use a broadcast variable to the same extend. This keeps the plan as one DAG, executed in one unit, so no re-computation will happen.

Intermediate result caching is actually a work that has been in progress for a while, but has stalled for a bit due to prioritization of some other issues. It will be resumed in the near future, definitely. Too many parts are already in place to not complete this feature...

Greetings,
Stephan


On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <[hidden email]> wrote:
ok, I think I got the point: I don’t have two execute but a collect in some branch
I will look for a way to remove it


What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small
Is it better to collect or a cogroup?


btw is something you expect to solve i further versions?


thanks
michele




Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <[hidden email]> ha scritto:

Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele




Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Michele Bertoni
Hi Stephan,
I have one more question: what happens when I do collect inside a cogroup (i.e. doing an outer join) or in a groupreduce? 


Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <[hidden email]> ha scritto:

Hi!

In most places where you use collect(), you should be able to use a broadcast variable to the same extend. This keeps the plan as one DAG, executed in one unit, so no re-computation will happen.

Intermediate result caching is actually a work that has been in progress for a while, but has stalled for a bit due to prioritization of some other issues. It will be resumed in the near future, definitely. Too many parts are already in place to not complete this feature...

Greetings,
Stephan


On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <[hidden email]> wrote:
ok, I think I got the point: I don’t have two execute but a collect in some branch
I will look for a way to remove it


What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small
Is it better to collect or a cogroup?


btw is something you expect to solve i further versions?


thanks
michele




Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <[hidden email]> ha scritto:

Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele





Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Fabian Hueske-2
Hi Michele,

collect on DataSet and collect on a Collector within a Function are two different things and have the same name by coincidence (actually, this is the first time I noticed that).

DataSet.collect() fetches a DataSet which can be distributed across several TaskManagers (in a cluster) to you local client program.
Collector.collect() adds a value to the result of a function call. The collector is used in function that can return an arbitrary number of results (0 to n).

Best,
Fabian

2015-09-14 20:58 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi Stephan,
I have one more question: what happens when I do collect inside a cogroup (i.e. doing an outer join) or in a groupreduce? 



Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <[hidden email]> ha scritto:

Hi!

In most places where you use collect(), you should be able to use a broadcast variable to the same extend. This keeps the plan as one DAG, executed in one unit, so no re-computation will happen.

Intermediate result caching is actually a work that has been in progress for a while, but has stalled for a bit due to prioritization of some other issues. It will be resumed in the near future, definitely. Too many parts are already in place to not complete this feature...

Greetings,
Stephan


On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <[hidden email]> wrote:
ok, I think I got the point: I don’t have two execute but a collect in some branch
I will look for a way to remove it


What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small
Is it better to collect or a cogroup?


btw is something you expect to solve i further versions?


thanks
michele




Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <[hidden email]> ha scritto:

Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele






Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Michele Bertoni
sorry i was not talking about that collect, I know what a collector is
I was talking about the outer join case where inside a cogroup you should do a ToSet on left or right side and collect it to be traversable multiple times

with a toSet it is transforming (something like) a lazy iterator to a list in memory: is it actually collecting something thus stopping execution or is it something different?





Il giorno 14/set/2015, alle ore 22:18, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

collect on DataSet and collect on a Collector within a Function are two different things and have the same name by coincidence (actually, this is the first time I noticed that).

DataSet.collect() fetches a DataSet which can be distributed across several TaskManagers (in a cluster) to you local client program.
Collector.collect() adds a value to the result of a function call. The collector is used in function that can return an arbitrary number of results (0 to n).

Best,
Fabian

2015-09-14 20:58 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi Stephan,
I have one more question: what happens when I do collect inside a cogroup (i.e. doing an outer join) or in a groupreduce? 



Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <[hidden email]> ha scritto:

Hi!

In most places where you use collect(), you should be able to use a broadcast variable to the same extend. This keeps the plan as one DAG, executed in one unit, so no re-computation will happen.

Intermediate result caching is actually a work that has been in progress for a while, but has stalled for a bit due to prioritization of some other issues. It will be resumed in the near future, definitely. Too many parts are already in place to not complete this feature...

Greetings,
Stephan


On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <[hidden email]> wrote:
ok, I think I got the point: I don’t have two execute but a collect in some branch
I will look for a way to remove it


What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small
Is it better to collect or a cogroup?


btw is something you expect to solve i further versions?


thanks
michele




Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <[hidden email]> ha scritto:

Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele







Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Fabian Hueske-2
Ah, sorry :-)
toSet, toList, etc. are regular methods of Scala's Iterator API [1] and not part of Flink's API although the concrete iterator is provided by Flink. I am not a Scala expert, but I think it will eagerly fetch the contents of the function's iterator into a set (or list). This call is part of the user function and executed just like any other call.

[1] http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

2015-09-14 22:26 GMT+02:00 Michele Bertoni <[hidden email]>:
sorry i was not talking about that collect, I know what a collector is
I was talking about the outer join case where inside a cogroup you should do a ToSet on left or right side and collect it to be traversable multiple times

with a toSet it is transforming (something like) a lazy iterator to a list in memory: is it actually collecting something thus stopping execution or is it something different?





Il giorno 14/set/2015, alle ore 22:18, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

collect on DataSet and collect on a Collector within a Function are two different things and have the same name by coincidence (actually, this is the first time I noticed that).

DataSet.collect() fetches a DataSet which can be distributed across several TaskManagers (in a cluster) to you local client program.
Collector.collect() adds a value to the result of a function call. The collector is used in function that can return an arbitrary number of results (0 to n).

Best,
Fabian

2015-09-14 20:58 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi Stephan,
I have one more question: what happens when I do collect inside a cogroup (i.e. doing an outer join) or in a groupreduce? 



Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <[hidden email]> ha scritto:

Hi!

In most places where you use collect(), you should be able to use a broadcast variable to the same extend. This keeps the plan as one DAG, executed in one unit, so no re-computation will happen.

Intermediate result caching is actually a work that has been in progress for a while, but has stalled for a bit due to prioritization of some other issues. It will be resumed in the near future, definitely. Too many parts are already in place to not complete this feature...

Greetings,
Stephan


On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <[hidden email]> wrote:
ok, I think I got the point: I don’t have two execute but a collect in some branch
I will look for a way to remove it


What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small
Is it better to collect or a cogroup?


btw is something you expect to solve i further versions?


thanks
michele




Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <[hidden email]> ha scritto:

Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele








Reply | Threaded
Open this post in threaded view
|

Re: intermediate result reuse

Stephan Ewen
ToSet should be good to use.

By default, the Iterators stream data (across memory, network, and disk), which allows you to use very large groups (larger than memory).

With ToSet, your group naturally has to fit into memory. But in most cases it will ;-)

On Mon, Sep 14, 2015 at 11:06 PM, Fabian Hueske <[hidden email]> wrote:
Ah, sorry :-)
toSet, toList, etc. are regular methods of Scala's Iterator API [1] and not part of Flink's API although the concrete iterator is provided by Flink. I am not a Scala expert, but I think it will eagerly fetch the contents of the function's iterator into a set (or list). This call is part of the user function and executed just like any other call.

[1] http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator


2015-09-14 22:26 GMT+02:00 Michele Bertoni <[hidden email]>:
sorry i was not talking about that collect, I know what a collector is
I was talking about the outer join case where inside a cogroup you should do a ToSet on left or right side and collect it to be traversable multiple times

with a toSet it is transforming (something like) a lazy iterator to a list in memory: is it actually collecting something thus stopping execution or is it something different?





Il giorno 14/set/2015, alle ore 22:18, Fabian Hueske <[hidden email]> ha scritto:

Hi Michele,

collect on DataSet and collect on a Collector within a Function are two different things and have the same name by coincidence (actually, this is the first time I noticed that).

DataSet.collect() fetches a DataSet which can be distributed across several TaskManagers (in a cluster) to you local client program.
Collector.collect() adds a value to the result of a function call. The collector is used in function that can return an arbitrary number of results (0 to n).

Best,
Fabian

2015-09-14 20:58 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi Stephan,
I have one more question: what happens when I do collect inside a cogroup (i.e. doing an outer join) or in a groupreduce? 



Il giorno 13/set/2015, alle ore 02:13, Stephan Ewen <[hidden email]> ha scritto:

Hi!

In most places where you use collect(), you should be able to use a broadcast variable to the same extend. This keeps the plan as one DAG, executed in one unit, so no re-computation will happen.

Intermediate result caching is actually a work that has been in progress for a while, but has stalled for a bit due to prioritization of some other issues. It will be resumed in the near future, definitely. Too many parts are already in place to not complete this feature...

Greetings,
Stephan


On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni <[hidden email]> wrote:
ok, I think I got the point: I don’t have two execute but a collect in some branch
I will look for a way to remove it


What I am doing is to keep all the elements of A that as value equal to something in B, where B (at this point) is very small
Is it better to collect or a cogroup?


btw is something you expect to solve i further versions?


thanks
michele




Il giorno 12/set/2015, alle ore 16:27, Stephan Ewen <[hidden email]> ha scritto:

Fabian has explained it well. All functions are executed lazily as one DAG, when "env.execute()" is called.

Beware that there are three exceptions:
  - count()
  - collect()
  - print()

These functions trigger an immediate program execution (they are "eager" functions). They will execute all that is needed for produce their result. Summing up:

---------------------------

One execution in this case (result "a" is reused by "b" and "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a.flatmap()
c = a.groupBy() -> reduce()

b.writeAsText()
c.writeAsCsv()

env.execute();

---------------------------

Two executions in this case ("a" is computed twice, once for "b" and once for "c")

a = env.createInput() -> map() -> reduce() -> filter()

b = a -> flatmap() -> count()
c = a -> groupBy() -> reduce().collect()

---------------------------

Greetings,
Stephan


On Sat, Sep 12, 2015 at 11:31 AM, Fabian Hueske <[hidden email]> wrote:
Hi Michele,

Flink programs can have multiple sinks.
In your program, the intermediate result a will be streamed to both filters (b and c) at the same time and both sinks will be written at the same time.
So in this case, there is no need to materialize the intermediate result a.

If you call execute() after you defined b, the program will compute a and stream the result only to b.
If you call execute() again after you defined c, the program will compute a again and stream the result to c.

Summary:
Flink programs can usually stream intermediate results without materializing them. There are a few cases where it needs to materialize intermediate results in order to avoid deadlocks, but these are fully transparently handled.
It is not possible (yet!) to share results across program executions, i.e., whenever you call execute().

I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time.

Best, Fabian

2015-09-12 11:02 GMT+02:00 Michele Bertoni <[hidden email]>:
Hi everybody, I have a question about internal optimization
is flink able to reuse intermediate result that are used twice in the graph?

i.e.
a = readsource -> filter -> reduce -> something else even more complicated

b = a filter(something)
store b

c = a filter(something else)
store c

what happens to a? is it computed twice?

in my read function I have a some logging commands and I see the printed twice, but it sounds strange to me



thanks
cheers
michele