Hi guys,
In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0). I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size.. Best, Flavio |
Follow the first approach. Joins are expensive, union comes for free. Best, Fabian 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <[hidden email]>:
|
Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?
Thanks in advance, Flavio On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
|
Union is just combining data from multiple sources into a single dataset. That’s it. No memory, no disk involved. In you case you have input1.union(input2).groupBy(1).reduce(…) This will translate into: input1 -> repartition -> read-both-inputs -> sort -> reduce input2 -> repartition -> So, in your case not even additional network transfer is involved, because both data sets would need to be partitioned for the reduce anyway. Note, union in Flink has SQL union-all semantics, i.e., there is not removal of duplicates. Cheers, Fabian Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?
Thanks in advance, Flavio On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi Fabian,
I was trying to use the strategy you suggested with flink 0.8.1 but it seems that the union of the datasets cannot be created programmatically because the union operator gives a name to the generated dataset that is the name of the calling function so that only the first dataset is read. My code looks like: private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final String outputGraph, List<String> tableNames) { DataSet<Tuple6<...>> ret = null; for (String tableName : tableNames) { DataSet<Tuple6<...>> sourceDs = env.createInput(new MyTableInputFormat(tableName)) .... if(ret==null) ret = sourceDs; else ret.union(sourceDs); } return ret; } Is this a bug or am I'm doing something wrong? Thanks in advance, Flavio On Mon, Dec 22, 2014 at 2:42 PM, <[hidden email]> wrote:
|
As always one minute after I sent the email I found the problem!
It was that I should reassign the initial dataset: ret = ret.union(sourceDs); Bye, Flavio On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Yes, that is the way to do it. This makes me think that it would be nice to have a method that builds the union of a list of data sets. DataSet<T> union(DataSet<T>... sets) It would be implemented like in your loop. Would that be helpful? Stephan On Tue, Mar 17, 2015 at 6:03 PM, Flavio Pompermaier <[hidden email]> wrote:
|
I don't know if that could be useful, do you?
On Tue, Mar 17, 2015 at 10:29 PM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |