Hi, join with two columns of both tables

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Hi, join with two columns of both tables

Philip Lee
I want to join two tables with two columns like
//    AND sr_customer_sk      = ws_bill_customer_sk 
// AND sr_item_sk = ws_item_sk
val srJoinWs = storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){
(storeReturn: StoreReturn, webSales: WebSales, out: Collector[(Long,Long,Long)]) =>
if(storeReturn._customer_sk.equals(webSales._bill_customer_sk))
out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number)
else
None
}
According to the explaination from join phase, I should do like it if I want to join like the way. Isn't it right?
But the thing is it does not work in that Type dismatch; expected TypeInformation[Long], actual(StoreReturn, WebSales, Collector[(Long,Long,Long)]) => Any
I tried many ways but it still does not work.
Any suggestion?
Best Regards,
Phil

Reply | Threaded
Open this post in threaded view
|

Re: Hi, join with two columns of both tables

Fabian Hueske-2
Why don't you use a composite key for the Flink join (first.join(second).where(0,1).equalTo(2,3).with(...)?
This would be more efficient and you can omit the check in the join function.

Best, Fabian

2015-11-08 19:13 GMT+01:00 Philip Lee <[hidden email]>:
I want to join two tables with two columns like
//    AND sr_customer_sk      = ws_bill_customer_sk 
// AND sr_item_sk = ws_item_sk
val srJoinWs = storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){
(storeReturn: StoreReturn, webSales: WebSales, out: Collector[(Long,Long,Long)]) =>
if(storeReturn._customer_sk.equals(webSales._bill_customer_sk))
out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number)
else
None
}
According to the explaination from join phase, I should do like it if I want to join like the way. Isn't it right?
But the thing is it does not work in that Type dismatch; expected TypeInformation[Long], actual(StoreReturn, WebSales, Collector[(Long,Long,Long)]) => Any
I tried many ways but it still does not work.
Any suggestion?
Best Regards,
Phil