this.recordBuildSideAccessor = RecordSerializer.get();
this.recordProbeSideAccessor = RecordSerializer.get();
final int[] buildKeyPos = new int[]{buildSideJoinIndex};
final int[] probeKeyPos = new int[]{probeSideJoinIndex};
final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
List<MemorySegment> memorySegments;
int pageSize = hashTableMemoryManager.getTotalNumPages();
try {
memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
}
catch (MemoryAllocationException e) {
LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
Throwables.propagate(e);
return;
}
try {
Stopwatch stopwatch = Stopwatch.createStarted();
UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
join = new MutableHashTable<Record, Record>(
recordBuildSideAccessor,
recordProbeSideAccessor,
recordBuildSideComparator,
recordProbeSideComparator,
pactRecordComparator,
memorySegments,
ioManager
);
join.open(buildInput,probeInput);
LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
while (join.nextRecord()) {
Record currentProbeRecord = join.getCurrentProbeRecord();
MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
while (buildSideIterator.next(reusedBuildSideRow) != null) {
materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
rowNum++;
}}
Hot spot,"Self time (microseconds)","Average Time","Invocations" | |||||||||
org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" | |||||||||
org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" | |||||||||
org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" | |||||||||
org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a" | |||||||||
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a" | |||||||||
org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a" | |||||||||
org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a" My log show that hashjoin.open() method costs too much time. > construct hash table elapsed:1885ms |
Hi Weijie,it might be the case that batching the processing of multiple rows can give you an improved performance compared to single row processing.Maybe you could share the exact benchmark base line results and the code you use to test Flink's MutableHashTable with us. Also the Flink configuration and how you run it would be of interest. That way we might be able to see if we can tune Flink a bit more.Cheers,TillOn Sun, May 14, 2017 at 5:23 AM, weijie tong <[hidden email]> wrote:I has a test case to use Flink's MutableHashTable class to do a hash join on a local machine with 64g memory, 64cores. The test case is one build table with 14w rows ,one probe table with 320w rows ,the matched result rows is 12 w.It takes 2.2 seconds to complete the join.The performance seems bad. I ensure there's no overflow, the smaller table is the build side. The MutableObjectIterator is a sequence of Rows. The Row is composed of several fields which are byte[]. Through my log,I find the open() method takes 1.560 seconds. The probe iterates phase takes 680ms. And my Jprofiler's profile shows the MutableObjectIterator's next() method call is the hotspot.I want to know how to tune this scenario. I find Drill's HashJoin is batch model. Its build side's input is a RecordBatch which holds batch of rows and memory size is approach to L2 cache. Through this strategy it will gain less method calls (that means call to next() ) and much efficient to cpu calculation. I also find SQL server's paper noticed the batch model's performance gains (https://www.microsoft.com/en-us/research/wp-content/uploads ) . I guess the performance's down is due to the single row iterate model./2013/06/Apollo3-Sigmod-2013- final.pdf Hope someone to correct my opinion. Also maybe I have a wrong use of the MutableHashTable. wait for someone to give an advice.
Free forum by Nabble | Edit this page |