Hi to all,
I have a case where I don't understand why flink is not able to optimize the join between 2 datasets. My initial code was basically this: DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 elements DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); This job wasn't able to complete on my local machine (from Eclipse) because Flink was giving me the following error: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys. This was because in attrToExpand the List<MyObject> could be quite big. Indeed, changing that code to the following make everything work like a charm: DataSet<Tuple2<String, List<ThriftObj>>> subset = attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); Isn't something impossible for Flink to optimize my initial code into the second? I was convinced that Flink was performing a join only on the keys before grabbing also the other elements of the Tuples into memory..am I wrong? Best, Flavio |
Hi Flavio! No, Flink does not join keys before full values. That is very often very inefficient, as it results effectively in two joins where one is typically about as expensive as the original join. One can do "semi-join-reduction", in case the join filters out many values (many elements from one side do not find a match in the other side). If the join does not filter, this does not help either. Your code is a bit of a surprise. Especially, because in you solution that worked, the first statement does nothing: DataSet<Tuple2<String, List<ThriftObj>>> subset = attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); This builds a join, but then takes the second input of the join (the bigDataset data set). Because the result of the join is never actually used, it is never executed. The second statement hence is effectively DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); Curious why this executed when the original did not. BTW: If the Lists are very long so they do not fit into a hashtable memory partition, you can try to use a JoinHint to create a sort-merge join. It may become slower, but typically works with even less memory. Greetings, Stephan On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Obviously when trying to simplify my code I didn't substitute correctly the variable of the join..it should be:
DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); Do you think that a JoinHint to create a sort-merge join is equivalent to my solution? On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <[hidden email]> wrote:
|
The problem is the "getInput2()" call. It takes the input to the join, not the result of the join. That way, the first join never happens. On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Ah..Fortunately it seems to do what I need :)
It efficiently filters the bigDataset retaining only the needed elements making the join feasible with few memory.. :) So that's a bug? Which should be the right way to achieve that behaviour with Flink? On Tue, Sep 8, 2015 at 11:22 AM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |