ds1.filter(//here selection of query 1)
ds2.filter(//here selection of query 2)
exist
ds1.join(ds2.distinct(id)).where(id).equal(id){ // join by your join key(s) - note the distinct operator, otherwise you will get many line for each input line
(left, right) => left //collect left
}
or
ds1.cogroup(ds2).where(id).equal(id){ //cogroup by your join key(s)
(left : Iterator, right: Iterator, out: Collector) =>
if(right.hasNext) //exist something in right dataset
while(left.hasNext) //collect all the left
out.collect(left.next)
}
not exist
ds1.cogroup(ds2).where(id).equal(id){ //cogroup by your join key(s)
(left : Iterator, right: Iterator, out: Collector) =>
if( ! right.hasNext) //nothing exists in right dataset - note the not (exclamation mark) in front
while(left.hasNext) //collect all the left
out.collect(left.next)
}
in short you are doing a full-outer-join and keeping only elements with at [ least one(exist) | no(not exist) ] matching element
this is just a sketch written on my smartphone you should re-adapt it to your query
cheers