Hi to all, I have to join two datasets but I'd like to keep all data in the left also if there' no right dataset. How can you achieve that in Flink? maybe I should use coGroup? Best, Flavio |
On 15 Apr 2015, at 10:30, Flavio Pompermaier <[hidden email]> wrote: > > Hi to all, > I have to join two datasets but I'd like to keep all data in the left also if there' no right dataset. > How can you achieve that in Flink? maybe I should use coGroup? Yes, currently you have to implement this manually with a coGroup |
Do you have an already working example of it? :)
On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <[hidden email]> wrote:
|
please add link to explain left join using cogroup
or add example very thanks |
In reply to this post by Flavio Pompermaier
I think this may be a great example to add as a utility function. Or actually add as an function to the DataSet, internally realized as a special case of coGroup. We do not have a ready example of that, but it should be straightforward to realize. Similar as for the join, coGroup on the join keys. Inside the coGroup function, emit the combination of all values from the two iterators. If one of them is empty (the one that is not outer) then emit all values from the outer side. Greetings, Stephan On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Hi Flavio, Here's an simple example of a Left Outer Join: https://gist.github.com/mxm/c2e9c459a9d82c18d789As Stephan pointed out, this can be very easily modified to construct a Right Outer Join (just exchange leftElements and rightElements in the two loops). public static class LeftOuterJoin implements CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, Integer>> { On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <[hidden email]> wrote:
|
That solution works if you can define a NULL_ELEMENT but not if you want to use the full value range of Integer. This is something that we need to solve a bit differently. Maybe by adding optional null-valued field support to Tuple. 2015-04-15 5:59 GMT-05:00 Maximilian Michels <[hidden email]>:
|
This is something that we need to solve a bit differently. That was just a proof of concept. I agree, for a proper implementation, one would need to differentiate between a regular element and a NULL element. On Thu, Apr 16, 2015 at 3:23 PM, Fabian Hueske <[hidden email]> wrote:
|
In reply to this post by Maximilian Michels
Hi Maximilian,
I tried your solution but it doesn't work because the rightElements iterator cannot be used more than once: Caused by: org.apache.flink.util.TraversableOnceException: The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed. On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels <[hidden email]> wrote:
|
You can materialize the input of the right input by creating an array out of it, for example. Then you can reiterate over it. Cheers, On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <[hidden email]> wrote:
|
I cannot find a solution to my use case :( I have 2 datasets D1 and D2 like: D1: A,p1,a1 A,p2,a2 A,p3,X B,p3,Y B,p1,b1 D2: X,s,V X,r,2 Y,j,k I'd like to have a unique dataset D3(Tuple4) like A,X,a1,a2 B,Y,b1,null Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if D1.f1==p2)> when D1.f2==D2.f0. Is that possible and how? Could you show me a simple snippet? Thanks in advance, Flavio On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <[hidden email]> wrote:
|
Hi Flavio, I don't really understand what you try to do. What does D1.f2(D1.f1==p1) mean? What does happen if the condition in D1.f2(if D1.f1==p2) is false? Where does the values a1 and a2 in (A, X, a1, a2) come from when you join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can elaborate a bit more on your example. Cheers, Till On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Hi Till,
thanks for the reply. What I'd like to do is to merge D1 and D2 if there's a ref from D1 to D2 (D1.f2==D2.f0). If this condition is true, I would like to produce a set of tuples with the matching elements at the first to places (D1.f2, D2.f0) and the other two values (if present) of the matching tuple in D1 when D1.f1=="a1" and D1.f2="a2" (string values) respectively. (PS: For each value of D1.f0 you can have at most one value of a1 and a2) Is it more clear? On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <[hidden email]> wrote:
|
If it's fine when you have null string values in the cases where D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with Scala API): val ds1: DataSet[(String, String, String)] = getDS1 val ds2: DataSet[(String, String, String)] = getDS2 ds1.coGroup(ds2).where(2).equalTo(0) { (left, right, collector: Collector[(String, String, String, String)]) => { if(right.isEmpty) { left foreach { element => { val value1 = if(element._2 == "a1") element._3 else null val value2 = if(element._2 == "a2") element._3 else null collector.collect((element._1, null, value1, value2)) } } } else { val array = right.toArray for(leftElement <- left) { val value1 = if(leftElement._2 == "a1") leftElement._3 else null val value2 = if(leftElement._2 == "a2") leftElement._3 else null for(rightElement <- array) { collector.collect(leftElement._1, rightElement._1, value1, value2)) } } } } } Does this solve your problem? On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Could resolve the problem but the fact to accumulate stuff in a local variable is it safe if datasets are huge..?
On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann <[hidden email]> wrote:
|
No its not, but at the moment there is afaik no other way around it. There is an issue for proper outer join support [1] On Fri, Apr 17, 2015 at 10:01 AM, Flavio Pompermaier <[hidden email]> wrote:
|
That would be very helpful...
Thanks for the support, Flavio On Fri, Apr 17, 2015 at 10:04 AM, Till Rohrmann <[hidden email]> wrote:
|
If you know that the group cardinality of one input is always 1 (or 0) you can make that input the one to cache in memory and stream the other input with potentially more group elements. 2015-04-17 4:09 GMT-05:00 Flavio Pompermaier <[hidden email]>:
|
Could you explain a little more in detail this caching mechanism with a simple code snippet...? Thanks, On Apr 17, 2015 1:12 PM, "Fabian Hueske" <[hidden email]> wrote:
|
There is no caching mechanism. To do the left outer join as in Tills implementation, you need to collect all elements of one! iterator in memory. If you know, that one of the two iterators contains at most 1 element, you should collect that in memory and stream the elements of the other iterator.2015-04-17 6:18 GMT-05:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |