Case of possible join optimization

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Case of possible join optimization

Flavio Pompermaier
Hi to all,

I have a case where I don't understand why flink is not able to optimize the join between 2 datasets.

My initial code was basically this:

DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 elements
DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements

DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = 
attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

This job wasn't able to complete on my local machine (from Eclipse) because Flink was giving me the following error:

Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.

This was because in attrToExpand the List<MyObject> could be quite big. Indeed, changing that code to the following make everything work like a charm:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Isn't something impossible for Flink to optimize my initial code into the second? I was convinced that Flink was performing a join only on the keys before grabbing also the other elements of the Tuples into memory..am I wrong?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Case of possible join optimization

Stephan Ewen
Hi Flavio!

No, Flink does not join keys before full values. That is very often very inefficient, as it results effectively in two joins where one is typically about as expensive as the original join.

One can do "semi-join-reduction", in case the join filters out many values (many elements from one side do not find a match in the other side). If the join does not filter, this does not help either.

Your code is a bit of a surprise. Especially, because in you solution that worked, the first statement does nothing:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();


This builds a join, but then takes the second input of the join (the bigDataset data set). Because the result of the join is never actually used, it is never executed. The second statement hence is effectively

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Curious why this executed when the original did not.

BTW: If the Lists are very long so they do not fit into a hashtable memory partition, you can try to use a JoinHint to create a sort-merge join. It may become slower, but typically works with even less memory.


Greetings,
Stephan


On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I have a case where I don't understand why flink is not able to optimize the join between 2 datasets.

My initial code was basically this:

DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 elements
DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements

DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = 
attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

This job wasn't able to complete on my local machine (from Eclipse) because Flink was giving me the following error:

Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.

This was because in attrToExpand the List<MyObject> could be quite big. Indeed, changing that code to the following make everything work like a charm:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Isn't something impossible for Flink to optimize my initial code into the second? I was convinced that Flink was performing a join only on the keys before grabbing also the other elements of the Tuples into memory..am I wrong?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Case of possible join optimization

Flavio Pompermaier
Obviously when trying to simplify my code I didn't substitute correctly the variable of the join..it should be:

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset =
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

Do you think that a JoinHint to create a sort-merge join is equivalent to my solution?

On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

No, Flink does not join keys before full values. That is very often very inefficient, as it results effectively in two joins where one is typically about as expensive as the original join.

One can do "semi-join-reduction", in case the join filters out many values (many elements from one side do not find a match in the other side). If the join does not filter, this does not help either.

Your code is a bit of a surprise. Especially, because in you solution that worked, the first statement does nothing:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();


This builds a join, but then takes the second input of the join (the bigDataset data set). Because the result of the join is never actually used, it is never executed. The second statement hence is effectively

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Curious why this executed when the original did not.

BTW: If the Lists are very long so they do not fit into a hashtable memory partition, you can try to use a JoinHint to create a sort-merge join. It may become slower, but typically works with even less memory.


Greetings,
Stephan


On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I have a case where I don't understand why flink is not able to optimize the join between 2 datasets.

My initial code was basically this:

DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 elements
DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements

DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = 
attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

This job wasn't able to complete on my local machine (from Eclipse) because Flink was giving me the following error:

Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.

This was because in attrToExpand the List<MyObject> could be quite big. Indeed, changing that code to the following make everything work like a charm:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Isn't something impossible for Flink to optimize my initial code into the second? I was convinced that Flink was performing a join only on the keys before grabbing also the other elements of the Tuples into memory..am I wrong?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Case of possible join optimization

Stephan Ewen
The problem is the "getInput2()" call. It takes the input to the join, not the result of the join. That way, the first join never happens.

On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier <[hidden email]> wrote:
Obviously when trying to simplify my code I didn't substitute correctly the variable of the join..it should be:

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset =
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

Do you think that a JoinHint to create a sort-merge join is equivalent to my solution?


On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

No, Flink does not join keys before full values. That is very often very inefficient, as it results effectively in two joins where one is typically about as expensive as the original join.

One can do "semi-join-reduction", in case the join filters out many values (many elements from one side do not find a match in the other side). If the join does not filter, this does not help either.

Your code is a bit of a surprise. Especially, because in you solution that worked, the first statement does nothing:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();


This builds a join, but then takes the second input of the join (the bigDataset data set). Because the result of the join is never actually used, it is never executed. The second statement hence is effectively

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Curious why this executed when the original did not.

BTW: If the Lists are very long so they do not fit into a hashtable memory partition, you can try to use a JoinHint to create a sort-merge join. It may become slower, but typically works with even less memory.


Greetings,
Stephan


On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I have a case where I don't understand why flink is not able to optimize the join between 2 datasets.

My initial code was basically this:

DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 elements
DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements

DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = 
attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

This job wasn't able to complete on my local machine (from Eclipse) because Flink was giving me the following error:

Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.

This was because in attrToExpand the List<MyObject> could be quite big. Indeed, changing that code to the following make everything work like a charm:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Isn't something impossible for Flink to optimize my initial code into the second? I was convinced that Flink was performing a join only on the keys before grabbing also the other elements of the Tuples into memory..am I wrong?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Case of possible join optimization

Flavio Pompermaier
Ah..Fortunately it seems to do what I need :)
It efficiently filters the bigDataset retaining only the needed elements making the join feasible with few memory.. :)
So that's a bug? Which should be the right way to achieve that behaviour with Flink? 

On Tue, Sep 8, 2015 at 11:22 AM, Stephan Ewen <[hidden email]> wrote:
The problem is the "getInput2()" call. It takes the input to the join, not the result of the join. That way, the first join never happens.

On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier <[hidden email]> wrote:
Obviously when trying to simplify my code I didn't substitute correctly the variable of the join..it should be:

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset =
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

Do you think that a JoinHint to create a sort-merge join is equivalent to my solution?


On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

No, Flink does not join keys before full values. That is very often very inefficient, as it results effectively in two joins where one is typically about as expensive as the original join.

One can do "semi-join-reduction", in case the join filters out many values (many elements from one side do not find a match in the other side). If the join does not filter, this does not help either.

Your code is a bit of a surprise. Especially, because in you solution that worked, the first statement does nothing:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();


This builds a join, but then takes the second input of the join (the bigDataset data set). Because the result of the join is never actually used, it is never executed. The second statement hence is effectively

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Curious why this executed when the original did not.

BTW: If the Lists are very long so they do not fit into a hashtable memory partition, you can try to use a JoinHint to create a sort-merge join. It may become slower, but typically works with even less memory.


Greetings,
Stephan


On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I have a case where I don't understand why flink is not able to optimize the join between 2 datasets.

My initial code was basically this:

DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 elements
DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements

DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = 
attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

This job wasn't able to complete on my local machine (from Eclipse) because Flink was giving me the following error:

Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys.

This was because in attrToExpand the List<MyObject> could be quite big. Indeed, changing that code to the following make everything work like a charm:

DataSet<Tuple2<String, List<ThriftObj>>> subset =
      attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = 
      attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Isn't something impossible for Flink to optimize my initial code into the second? I was convinced that Flink was performing a join only on the keys before grabbing also the other elements of the Tuples into memory..am I wrong?

Best,
Flavio