Union of multiple datasets vs Join

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Union of multiple datasets vs Join

Flavio Pompermaier
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Union of multiple datasets vs Join

Fabian Hueske
Follow the first approach. 
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Union of multiple datasets vs Join

Flavio Pompermaier
Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
Follow the first approach. 
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Union of multiple datasets vs Join

Fabian Hueske-2
Union is just combining data from multiple sources into a single dataset.
That’s it. No memory, no disk involved.

In you case you have

input1.union(input2).groupBy(1).reduce(…)

This will translate into:

input1 -> repartition ->
                                        read-both-inputs ->  sort -> reduce
input2 -> repartition ->

So, in your case not even additional network transfer is involved, because both data sets would need to be partitioned for the reduce anyway.

Note, union in Flink has SQL union-all semantics, i.e., there is not removal of duplicates.

Cheers, Fabian 

From: [hidden email]
Sent: ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
To: [hidden email]

Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
Follow the first approach. 
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Union of multiple datasets vs Join

Flavio Pompermaier
Hi Fabian,
I was trying to use the strategy you suggested with flink 0.8.1 but it seems that the union of the datasets cannot be created programmatically because the union operator gives a name to the generated dataset that is the name of the calling function so that  only the first dataset is read. My code looks like:

private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final String outputGraph, List<String> tableNames) {
DataSet<Tuple6<...>> ret = null;
for (String tableName : tableNames) {
DataSet<Tuple6<...>> sourceDs = env.createInput(new MyTableInputFormat(tableName))
                        ....

if(ret==null)
ret = sourceDs;
else
ret.union(sourceDs);
               }
              return ret;
       }

Is this a bug or am I'm doing something wrong?
Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 2:42 PM, <[hidden email]> wrote:
Union is just combining data from multiple sources into a single dataset.
That’s it. No memory, no disk involved.

In you case you have

input1.union(input2).groupBy(1).reduce(…)

This will translate into:

input1 -> repartition ->
                                        read-both-inputs ->  sort -> reduce
input2 -> repartition ->

So, in your case not even additional network transfer is involved, because both data sets would need to be partitioned for the reduce anyway.

Note, union in Flink has SQL union-all semantics, i.e., there is not removal of duplicates.

Cheers, Fabian 

From: [hidden email]
Sent: ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
To: [hidden email]

Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
Follow the first approach. 
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Union of multiple datasets vs Join

Flavio Pompermaier
As always one minute after I sent the email I found the problem!
It was that I should reassign the initial dataset:
    ret = ret.union(sourceDs);

Bye,
Flavio

On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Fabian,
I was trying to use the strategy you suggested with flink 0.8.1 but it seems that the union of the datasets cannot be created programmatically because the union operator gives a name to the generated dataset that is the name of the calling function so that  only the first dataset is read. My code looks like:

private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final String outputGraph, List<String> tableNames) {
DataSet<Tuple6<...>> ret = null;
for (String tableName : tableNames) {
DataSet<Tuple6<...>> sourceDs = env.createInput(new MyTableInputFormat(tableName))
                        ....

if(ret==null)
ret = sourceDs;
else
ret.union(sourceDs);
               }
              return ret;
       }

Is this a bug or am I'm doing something wrong?
Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 2:42 PM, <[hidden email]> wrote:
Union is just combining data from multiple sources into a single dataset.
That’s it. No memory, no disk involved.

In you case you have

input1.union(input2).groupBy(1).reduce(…)

This will translate into:

input1 -> repartition ->
                                        read-both-inputs ->  sort -> reduce
input2 -> repartition ->

So, in your case not even additional network transfer is involved, because both data sets would need to be partitioned for the reduce anyway.

Note, union in Flink has SQL union-all semantics, i.e., there is not removal of duplicates.

Cheers, Fabian 

From: [hidden email]
Sent: ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
To: [hidden email]

Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
Follow the first approach. 
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Union of multiple datasets vs Join

Stephan Ewen
Yes, that is the way to do it.

This makes me think that it would be nice to have a method that builds the union of a list of data sets.

DataSet<T> union(DataSet<T>... sets)

It would be implemented like in your loop. Would that be helpful?

Stephan


On Tue, Mar 17, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
As always one minute after I sent the email I found the problem!
It was that I should reassign the initial dataset:
    ret = ret.union(sourceDs);

Bye,
Flavio

On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Fabian,
I was trying to use the strategy you suggested with flink 0.8.1 but it seems that the union of the datasets cannot be created programmatically because the union operator gives a name to the generated dataset that is the name of the calling function so that  only the first dataset is read. My code looks like:

private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final String outputGraph, List<String> tableNames) {
DataSet<Tuple6<...>> ret = null;
for (String tableName : tableNames) {
DataSet<Tuple6<...>> sourceDs = env.createInput(new MyTableInputFormat(tableName))
                        ....

if(ret==null)
ret = sourceDs;
else
ret.union(sourceDs);
               }
              return ret;
       }

Is this a bug or am I'm doing something wrong?
Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 2:42 PM, <[hidden email]> wrote:
Union is just combining data from multiple sources into a single dataset.
That’s it. No memory, no disk involved.

In you case you have

input1.union(input2).groupBy(1).reduce(…)

This will translate into:

input1 -> repartition ->
                                        read-both-inputs ->  sort -> reduce
input2 -> repartition ->

So, in your case not even additional network transfer is involved, because both data sets would need to be partitioned for the reduce anyway.

Note, union in Flink has SQL union-all semantics, i.e., there is not removal of duplicates.

Cheers, Fabian 

From: [hidden email]
Sent: ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
To: [hidden email]

Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
Follow the first approach. 
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Union of multiple datasets vs Join

Flavio Pompermaier
I don't know if that could be useful, do you?

On Tue, Mar 17, 2015 at 10:29 PM, Stephan Ewen <[hidden email]> wrote:
Yes, that is the way to do it.

This makes me think that it would be nice to have a method that builds the union of a list of data sets.

DataSet<T> union(DataSet<T>... sets)

It would be implemented like in your loop. Would that be helpful?

Stephan


On Tue, Mar 17, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
As always one minute after I sent the email I found the problem!
It was that I should reassign the initial dataset:
    ret = ret.union(sourceDs);

Bye,
Flavio

On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <[hidden email]> wrote:
Hi Fabian,
I was trying to use the strategy you suggested with flink 0.8.1 but it seems that the union of the datasets cannot be created programmatically because the union operator gives a name to the generated dataset that is the name of the calling function so that  only the first dataset is read. My code looks like:

private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final String outputGraph, List<String> tableNames) {
DataSet<Tuple6<...>> ret = null;
for (String tableName : tableNames) {
DataSet<Tuple6<...>> sourceDs = env.createInput(new MyTableInputFormat(tableName))
                        ....

if(ret==null)
ret = sourceDs;
else
ret.union(sourceDs);
               }
              return ret;
       }

Is this a bug or am I'm doing something wrong?
Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 2:42 PM, <[hidden email]> wrote:
Union is just combining data from multiple sources into a single dataset.
That’s it. No memory, no disk involved.

In you case you have

input1.union(input2).groupBy(1).reduce(…)

This will translate into:

input1 -> repartition ->
                                        read-both-inputs ->  sort -> reduce
input2 -> repartition ->

So, in your case not even additional network transfer is involved, because both data sets would need to be partitioned for the reduce anyway.

Note, union in Flink has SQL union-all semantics, i.e., there is not removal of duplicates.

Cheers, Fabian 

From: [hidden email]
Sent: ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
To: [hidden email]

Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
Follow the first approach. 
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..

Best,
Flavio