Re: Tasks, slots, and partitioned joins
Posted by
Fabian Hueske-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Tasks-slots-and-partitioned-joins-tp16415p16424.html
Hi David,
Flink's DataSet API schedules one slice of a program to a task slot. A program slice is one parallel instance of each operator of a program.
When all operator of your program run with a parallelism of 1, you end up with only 1 slice that runs on a single slot.
Flink's DataSet API leverages data parallelism (running parallel instance of the same operator on different workers working on different data partitions) instead of task parallelism (running different operators on different workers).
Regarding your task, I would implement a custom InputFormat which extends the FileInputFormat. The FileInputFormat.open() [1] method is called with a FileInputSplit [2] which contains the file path. You can put the path aside and add as an additional field when emitting records in the nextRecord() method.
This way, you only need two sources (one for /source1 and one for /source2) and can join the records on a composite key of filename and join key. This should balance the load evenly over a larger number of keys.
However, you would lose the advantage of pre-partitioned files because all data of source1 would be joined with all data of source2.
There is a low-level interface to leverage pre-partitioned files. With SplitDataProperties [3] you can specify that the data produced by a DataSource [4] is partitioned by InputSplit.
If you implement the source in a way that a single split contains the information to read both files, you can avoid an additional shuffle and join locally. This is manual low-level optimization where you need to know what you are doing. I'm not sure if this is documented except for the Java Docs.
Hope this helps,
Fabian
[2]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java#L34[3]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java#L101[4]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java#L117