Join with Default-Value

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Join with Default-Value

Sebastian Neef
Hi,

is it possible to assign a "default" value to elements that didn't match?

For example I have the following two datasets:

|DataSetA | DataSetB|
---------------------
|id=1  | id=1
|id=2  | id=3
|id=5  | id=4
|id=6  | id=6

When doing a join with:

A.join(B).where( KeySelector(A.id))
        .equalTo(KeySelector(B.id))

The resulting dataset is:

|(DataSetA | DataSetB)|
---------------------
|(id=1  | id=1)
|(id=6  | id=6)

What is the best way to assign a default value to the elements id=2/id=5
from DataSet A. E.g. I need a result which looks similar to this:

|(DataSetA | DataSetB)|
---------------------
|(id=1  | id=1)
|(id=2  | Default)
|(id=5  | Default)
|(id=6  | id=6)

My idea would be to get the missing Elements from DataSetA by .filter
with (DataSetA|DataSetB) and then do a .union after creating a tuple
with a default value. But that sounds a bit over-complicated.

Best regards,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Join with Default-Value

Gábor Gévay
Hello Sebastian,

You can use DataSet.leftOuterJoin for this.

Best,
Gábor




2017-02-10 12:58 GMT+01:00 Sebastian Neef <[hidden email]>:

> Hi,
>
> is it possible to assign a "default" value to elements that didn't match?
>
> For example I have the following two datasets:
>
> |DataSetA | DataSetB|
> ---------------------
> |id=1     | id=1
> |id=2     | id=3
> |id=5     | id=4
> |id=6     | id=6
>
> When doing a join with:
>
> A.join(B).where( KeySelector(A.id))
>         .equalTo(KeySelector(B.id))
>
> The resulting dataset is:
>
> |(DataSetA | DataSetB)|
> ---------------------
> |(id=1    | id=1)
> |(id=6    | id=6)
>
> What is the best way to assign a default value to the elements id=2/id=5
> from DataSet A. E.g. I need a result which looks similar to this:
>
> |(DataSetA | DataSetB)|
> ---------------------
> |(id=1    | id=1)
> |(id=2    | Default)
> |(id=5    | Default)
> |(id=6    | id=6)
>
> My idea would be to get the missing Elements from DataSetA by .filter
> with (DataSetA|DataSetB) and then do a .union after creating a tuple
> with a default value. But that sounds a bit over-complicated.
>
> Best regards,
> Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Join with Default-Value

Sebastian Neef
Hi,

thanks! That's exactly what I needed.

I'm not using: DataSetA.leftOuterJoin(DataSetB).where(new
KeySelector()).equalTo(new KeySelector()).with(new JoinFunction(...)).

Now I get the following error:

> Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.java.operators.JoinOperator$EquiJoin

I'm using flink-1.1.4-hd27.

Any ideas how I can fix that bug? It did properly work with a simple .join()

Regards,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Join with Default-Value

Gábor Gévay
I'm not sure what exactly is the problem, but could you check this FAQ item?
http://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-

Best,
Gábor



2017-02-10 14:16 GMT+01:00 Sebastian Neef <[hidden email]>:

> Hi,
>
> thanks! That's exactly what I needed.
>
> I'm not using: DataSetA.leftOuterJoin(DataSetB).where(new
> KeySelector()).equalTo(new KeySelector()).with(new JoinFunction(...)).
>
> Now I get the following error:
>
>> Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.java.operators.JoinOperator$EquiJoin
>
> I'm using flink-1.1.4-hd27.
>
> Any ideas how I can fix that bug? It did properly work with a simple .join()
>
> Regards,
> Sebastian