http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Tasks-slots-and-partitioned-joins-tp16415p16440.html
Hi David,
please find my answers below:
1. For high utilization, all slot should be filled. Each slot will processes a slice of the program on a slice of the data. In case of partitioning or changed parallelism, the data is shuffled accordingly .
2. That's a good question. I think the default logic is to go round-robin on the TMs as you suggested, but I'm not 100% sure. There are a couple of exceptions and special cases, IIRC.
3. No, I would still use Flink's join operator to do the join. When you read both files with the same split, you'd have a single source for both input. You could do something like:
/-- Filter "/source1" --\
Source -< >-Join-->
\-- Filter "/source2" --/
If all operators have the same parallelism and the source has the right split properties configured, all data should stay local and the join would work without partitioning the data.
You could go even further if the data in the files is sorted on the join key. Then you could read in zig-zag fashion from both files and give sorted split properties. In theory, the join would happen without a sort (haven't tried this though).
4.a Yes that is true. FileInputFormat has a flag to prevent files from being split up into multiple splits.
4.b You might be able to hack this with a custom InputSplitAssigner. The SplitDataProperties partition methods have a partitioniner ID field. IIRC, this is used to determine equal partitioning for joins.
However, as I said, you need to make sure that the files with the same keys are read by the same subtask. You could do that with a custom InputSplitAssigner.
My proposal to read both files with the same key in the same input split (with a single source) tried to go around this issue by forcing the data of both files in the same subtask.
4.c. The concept of a partition is a bit different in Flink and not bound to InputSplits. All data arriving at a parallel instance of an operator is considered to be in the same partition.
So both, FlatMap and MapPartition, call open() just once. In MapPartition the mapPartition() method is also called once, while flatMap() is called for each record.
Cheers, Fabian