Hashjoin implementation

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Hashjoin implementation

Benjamin Burkhardt
Hi,

can anyone tell me where the default hybrid hash join function for partitioning (shuffle phase) is implemented?
Even after deeper dinning I was not able to figure out where it is located.

Might be somewhere here?
—> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash

Thanks in advance.

Benjamin
Reply | Threaded
Open this post in threaded view
|

Re: Hashjoin implementation

vino yang
Hi Benjamin,

The approximate location is this package, the more accurate location is here.[1]

Specifically, Hash Join is divided into two steps:

1) build side
2) probe side

Thanks ,vino.


Benjamin Burkhardt <[hidden email]> 于2018年9月10日周一 下午10:09写道:
Hi,

can anyone tell me where the default hybrid hash join function for partitioning (shuffle phase) is implemented?
Even after deeper dinning I was not able to figure out where it is located.

Might be somewhere here?
—> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash

Thanks in advance.

Benjamin
Reply | Threaded
Open this post in threaded view
|

Re: Hashjoin implementation

Benjamin Burkhardt
Hi vino,

thanks. 

I was running a join operation on two DataSets and writing the result to disk and the results were correct.
I just was not able to identify the moment when the Hashtable is built. (HashPartition.java is not used in this case?)

Do you have an idea why I cannot find it?


Here is a part of my code:
DataSet<RowCustomers> customers = env.fromElements( new RowCustomers(1, Mayer));
tEnv.registerDataSet("Customers", customers, "customerID, customerName");
DataSet<RowOrders> orders = env.fromElements( new RowOrders(1, 1, new String[]{Pen, Paper"}, "22.08.2018"));
tEnv.registerDataSet("Orders", orders, "orderID, customerID, items, date");
DataSet<Tuple2 <RowCustomers, RowOrders>> result = customers.join(orders, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where("customerID").equalTo("customerID");
result.writeAsText(„/“);

Thanks a lot.
Benjamin


Am 11.09.2018 um 04:24 schrieb vino yang <[hidden email]>:

Hi Benjamin,

The approximate location is this package, the more accurate location is here.[1]

Specifically, Hash Join is divided into two steps:

1) build side
2) probe side

Thanks ,vino.


Benjamin Burkhardt <[hidden email]> 于2018年9月10日周一 下午10:09写道:
Hi,

can anyone tell me where the default hybrid hash join function for partitioning (shuffle phase) is implemented?
Even after deeper dinning I was not able to figure out where it is located.

Might be somewhere here?
—> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash

Thanks in advance.

Benjamin

Reply | Threaded
Open this post in threaded view
|

Re: Hashjoin implementation

vino yang
Hi Benjamin,

Do you mean that you want to see HashPartition.java when you write the program?
Oh, maybe you have confused something. 
The only thing you use to write a program is the Flink DataSet API, which is just a way to describe the job logic. 
And the class you are looking for, it's in the flink-runtime module and it works at runtime. 
Therefore, it is impossible to see it when you write the job program. 
If you really need to see it, then you can add some logs to the HashPartition.java related methods, 
but you need to recompile and package the flink-runtime from the source and replace the jar with the same name in the flink distribution.

Thanks, vino.


Benjamin Burkhardt <[hidden email]> 于2018年9月12日周三 上午12:31写道:
Hi vino,

thanks. 

I was running a join operation on two DataSets and writing the result to disk and the results were correct.
I just was not able to identify the moment when the Hashtable is built. (HashPartition.java is not used in this case?)

Do you have an idea why I cannot find it?


Here is a part of my code:
DataSet<RowCustomers> customers = env.fromElements( new RowCustomers(1, Mayer));
tEnv.registerDataSet("Customers", customers, "customerID, customerName");
DataSet<RowOrders> orders = env.fromElements( new RowOrders(1, 1, new String[]{Pen, Paper"}, "22.08.2018"));
tEnv.registerDataSet("Orders", orders, "orderID, customerID, items, date");
DataSet<Tuple2 <RowCustomers, RowOrders>> result = customers.join(orders, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where("customerID").equalTo("customerID");
result.writeAsText(„/“);

Thanks a lot.
Benjamin


Am 11.09.2018 um 04:24 schrieb vino yang <[hidden email]>:

Hi Benjamin,

The approximate location is this package, the more accurate location is here.[1]

Specifically, Hash Join is divided into two steps:

1) build side
2) probe side

Thanks ,vino.


Benjamin Burkhardt <[hidden email]> 于2018年9月10日周一 下午10:09写道:
Hi,

can anyone tell me where the default hybrid hash join function for partitioning (shuffle phase) is implemented?
Even after deeper dinning I was not able to figure out where it is located.

Might be somewhere here?
—> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash

Thanks in advance.

Benjamin