Hi,
is it possible to assign a "default" value to elements that didn't match? For example I have the following two datasets: |DataSetA | DataSetB| --------------------- |id=1 | id=1 |id=2 | id=3 |id=5 | id=4 |id=6 | id=6 When doing a join with: A.join(B).where( KeySelector(A.id)) .equalTo(KeySelector(B.id)) The resulting dataset is: |(DataSetA | DataSetB)| --------------------- |(id=1 | id=1) |(id=6 | id=6) What is the best way to assign a default value to the elements id=2/id=5 from DataSet A. E.g. I need a result which looks similar to this: |(DataSetA | DataSetB)| --------------------- |(id=1 | id=1) |(id=2 | Default) |(id=5 | Default) |(id=6 | id=6) My idea would be to get the missing Elements from DataSetA by .filter with (DataSetA|DataSetB) and then do a .union after creating a tuple with a default value. But that sounds a bit over-complicated. Best regards, Sebastian |
Hello Sebastian,
You can use DataSet.leftOuterJoin for this. Best, Gábor 2017-02-10 12:58 GMT+01:00 Sebastian Neef <[hidden email]>: > Hi, > > is it possible to assign a "default" value to elements that didn't match? > > For example I have the following two datasets: > > |DataSetA | DataSetB| > --------------------- > |id=1 | id=1 > |id=2 | id=3 > |id=5 | id=4 > |id=6 | id=6 > > When doing a join with: > > A.join(B).where( KeySelector(A.id)) > .equalTo(KeySelector(B.id)) > > The resulting dataset is: > > |(DataSetA | DataSetB)| > --------------------- > |(id=1 | id=1) > |(id=6 | id=6) > > What is the best way to assign a default value to the elements id=2/id=5 > from DataSet A. E.g. I need a result which looks similar to this: > > |(DataSetA | DataSetB)| > --------------------- > |(id=1 | id=1) > |(id=2 | Default) > |(id=5 | Default) > |(id=6 | id=6) > > My idea would be to get the missing Elements from DataSetA by .filter > with (DataSetA|DataSetB) and then do a .union after creating a tuple > with a default value. But that sounds a bit over-complicated. > > Best regards, > Sebastian |
Hi,
thanks! That's exactly what I needed. I'm not using: DataSetA.leftOuterJoin(DataSetB).where(new KeySelector()).equalTo(new KeySelector()).with(new JoinFunction(...)). Now I get the following error: > Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.java.operators.JoinOperator$EquiJoin I'm using flink-1.1.4-hd27. Any ideas how I can fix that bug? It did properly work with a simple .join() Regards, Sebastian |
I'm not sure what exactly is the problem, but could you check this FAQ item?
http://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception- Best, Gábor 2017-02-10 14:16 GMT+01:00 Sebastian Neef <[hidden email]>: > Hi, > > thanks! That's exactly what I needed. > > I'm not using: DataSetA.leftOuterJoin(DataSetB).where(new > KeySelector()).equalTo(new KeySelector()).with(new JoinFunction(...)). > > Now I get the following error: > >> Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.java.operators.JoinOperator$EquiJoin > > I'm using flink-1.1.4-hd27. > > Any ideas how I can fix that bug? It did properly work with a simple .join() > > Regards, > Sebastian |
Free forum by Nabble | Edit this page |