// AND sr_customer_sk = ws_bill_customer_sk
// AND sr_item_sk = ws_item_sk
val srJoinWs = storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){
(storeReturn: StoreReturn, webSales: WebSales, out: Collector[(Long,Long,Long)]) =>
if(storeReturn._customer_sk.equals(webSales._bill_customer_sk))
out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number)
else
None
}
According to the explaination from join phase, I should do like it if I want to join like the way. Isn't it right?
But the thing is it does not work in that Type dismatch; expected TypeInformation[Long], actual(StoreReturn, WebSales, Collector[(Long,Long,Long)]) => Any
I tried many ways but it still does not work.
Any suggestion?
Best Regards,
Phil