Tasks, slots, and partitioned joins

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Tasks, slots, and partitioned joins

David Dreyfus
Hello -

I have a large number of pairs of files. For purpose of discussion:
/source1/{1..10000} and /source2/{1..10000}.

I want to join the files pair-wise: /source1/1 joined to /source2/1,
/source1/2 joined to /source2/2, and so on.
I then want to union the results of the pair-wise joins and perform an
aggregate.

I create a simple flink job that has four sources, two joins, and two sinks
to produce intermediate results. This represents two unrelated chains.

I notice that when running this job with parallelism = 1 on a standalone
machine with one task manager and 3 slots, only one slot gets used.

My concern is that when I scale up to a YARN cluster, flink will continue to
use one slot on one machine instead of using all slots on all machines.

Prior reading suggests all the data source subtasks are added to a default
resource group. Downstream tasks (joins and sinks) want to be colocated with
the data sources. The result is all of my tasks are executed in one slot.

Flink Stream (DataStream) offers the slotSharingGroup() function. This
doesn't seem available to the DataSet user.

*Q1:* How do I force Flink to distribute work evenly across task managers
and the slots allocated to them? If this shouldn't be a concern, please
elaborate.

When I scale up the number of unrelated chains I notice that flink seems to
start all of them at the same time, which results in thrashing and errors -
lots of IO and errors regarding hash buffers.

*Q2:* Is there any method for controlling the scheduling of tasks so that
some finish before others start? My work around is to execute multiple,
sequential batches with results going into an intermediate directory, and
then a final job that aggregates the results. I would certainly prefer one
job that might avoid the intermediate write.

If I treat /source1 as one data source and /source2 as the second, and then
join the two, flink will shuffle and partition the files on the join key.
The /source1 and /source2 files represent this partitioning. They are reused
multiple times; thus, I shuffle and save the results creating /source1 and
/source2.

*Q3:* Does flink have a method by which I can mark individual files (or
directories) as belonging to a particular partition so that when I try to
join them, the unnecessary shuffle and repartition is avoided?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Tasks, slots, and partitioned joins

Fabian Hueske-2
Hi David,

Flink's DataSet API schedules one slice of a program to a task slot. A program slice is one parallel instance of each operator of a program.
When all operator of your program run with a parallelism of 1, you end up with only 1 slice that runs on a single slot.
Flink's DataSet API leverages data parallelism (running parallel instance of the same operator on different workers working on different data partitions) instead of task parallelism (running different operators on different workers).

Regarding your task, I would implement a custom InputFormat which extends the FileInputFormat. The FileInputFormat.open() [1] method is called with a FileInputSplit [2] which contains the file path. You can put the path aside and add as an additional field when emitting records in the nextRecord() method.
This way, you only need two sources (one for /source1 and one for /source2) and can join the records on a composite key of filename and join key. This should balance the load evenly over a larger number of keys.
However, you would lose the advantage of pre-partitioned files because all data of source1 would be joined with all data of source2.

There is a low-level interface to leverage pre-partitioned files. With SplitDataProperties [3] you can specify that the data produced by a DataSource [4] is partitioned by InputSplit.
If you implement the source in a way that a single split contains the information to read both files, you can avoid an additional shuffle and join locally. This is manual low-level optimization where you need to know what you are doing. I'm not sure if this is documented except for the Java Docs.

Hope this helps,
Fabian

[2] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java#L34
[3] https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java#L101
[4] https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java#L117






2017-10-26 4:13 GMT+02:00 David Dreyfus <[hidden email]>:
Hello -

I have a large number of pairs of files. For purpose of discussion:
/source1/{1..10000} and /source2/{1..10000}.

I want to join the files pair-wise: /source1/1 joined to /source2/1,
/source1/2 joined to /source2/2, and so on.
I then want to union the results of the pair-wise joins and perform an
aggregate.

I create a simple flink job that has four sources, two joins, and two sinks
to produce intermediate results. This represents two unrelated chains.

I notice that when running this job with parallelism = 1 on a standalone
machine with one task manager and 3 slots, only one slot gets used.

My concern is that when I scale up to a YARN cluster, flink will continue to
use one slot on one machine instead of using all slots on all machines.

Prior reading suggests all the data source subtasks are added to a default
resource group. Downstream tasks (joins and sinks) want to be colocated with
the data sources. The result is all of my tasks are executed in one slot.

Flink Stream (DataStream) offers the slotSharingGroup() function. This
doesn't seem available to the DataSet user.

*Q1:* How do I force Flink to distribute work evenly across task managers
and the slots allocated to them? If this shouldn't be a concern, please
elaborate.

When I scale up the number of unrelated chains I notice that flink seems to
start all of them at the same time, which results in thrashing and errors -
lots of IO and errors regarding hash buffers.

*Q2:* Is there any method for controlling the scheduling of tasks so that
some finish before others start? My work around is to execute multiple,
sequential batches with results going into an intermediate directory, and
then a final job that aggregates the results. I would certainly prefer one
job that might avoid the intermediate write.

If I treat /source1 as one data source and /source2 as the second, and then
join the two, flink will shuffle and partition the files on the join key.
The /source1 and /source2 files represent this partitioning. They are reused
multiple times; thus, I shuffle and save the results creating /source1 and
/source2.

*Q3:* Does flink have a method by which I can mark individual files (or
directories) as belonging to a particular partition so that when I try to
join them, the unnecessary shuffle and repartition is avoided?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Tasks, slots, and partitioned joins

David Dreyfus
Hi Fabian,

Thank you for the great, detailed answers.
1. So, each parallel slice of the DAG is placed into one slot. The key to
high utilization is many slices of the source data (or the various methods
of repartitioning it). Yes?
2. In batch processing, are slots filled round-robin on task managers, or do
I need to tune the number of slots to load the cluster evenly?
3. Are you suggesting that I perform the join in my custom data source?
4. Looking at this sample from
org.apache.flink.optimizer.PropertyDataSourceTest

  DataSource<Tuple2&lt;Long, String>> data =
    env.readCsvFile("/some/path").types(Long.class, String.class);
 
  data.getSplitDataProperties()
    .splitsPartitionedBy(0);

4.a Does this code assume that one split == one file from /some/path? If
readCsvFile splits each file, the guarantee that all keys in each part of
the file share the same partition would be violated, right?
4.b Is there a way to mark a partition number so that sources that share
partition numbers are read in parallel and joined? If I have 10,000 pairs, I
want partition 1 read from the sources at the same time.
4.c Does a downstream flatmap function get an open() call for each new
partition? Or, do I chain MapPartition directly to the datasource?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Tasks, slots, and partitioned joins

Fabian Hueske-2
Hi David,

please find my answers below:

1. For high utilization, all slot should be filled. Each slot will processes a slice of the program on a slice of the data. In case of partitioning or changed parallelism, the data is shuffled accordingly .
2. That's a good question. I think the default logic is to go round-robin on the TMs as you suggested, but I'm not 100% sure. There are a couple of exceptions and special cases, IIRC.
3. No, I would still use Flink's join operator to do the join. When you read both files with the same split, you'd have a single source for both input. You could do something like:

                /-- Filter "/source1" --\
Source -<                                  >-Join-->
                \-- Filter "/source2" --/

If all operators have the same parallelism and the source has the right split properties configured, all data should stay local and the join would work without partitioning the data.
You could go even further if the data in the files is sorted on the join key. Then you could read in zig-zag fashion from both files and give sorted split properties. In theory, the join would happen without a sort (haven't tried this though).

4.a Yes that is true. FileInputFormat has a flag to prevent files from being split up into multiple splits.
4.b You might be able to hack this with a custom InputSplitAssigner. The SplitDataProperties partition methods have a partitioniner ID field. IIRC, this is used to determine equal partitioning for joins.
However, as I said, you need to make sure that the files with the same keys are read by the same subtask. You could do that with a custom InputSplitAssigner.
My proposal to read both files with the same key in the same input split (with a single source) tried to go around this issue by forcing the data of both files in the same subtask.
4.c. The concept of a partition is a bit different in Flink and not bound to InputSplits. All data arriving at a parallel instance of an operator is considered to be in the same partition.
So both, FlatMap and MapPartition, call open() just once. In MapPartition the mapPartition() method is also called once, while flatMap() is called for each record.

Cheers, Fabian




2017-10-26 15:04 GMT+02:00 David Dreyfus <[hidden email]>:
Hi Fabian,

Thank you for the great, detailed answers.
1. So, each parallel slice of the DAG is placed into one slot. The key to
high utilization is many slices of the source data (or the various methods
of repartitioning it). Yes?
2. In batch processing, are slots filled round-robin on task managers, or do
I need to tune the number of slots to load the cluster evenly?
3. Are you suggesting that I perform the join in my custom data source?
4. Looking at this sample from
org.apache.flink.optimizer.PropertyDataSourceTest

  DataSource<Tuple2&lt;Long, String>> data =
    env.readCsvFile("/some/path").types(Long.class, String.class);

  data.getSplitDataProperties()
    .splitsPartitionedBy(0);

4.a Does this code assume that one split == one file from /some/path? If
readCsvFile splits each file, the guarantee that all keys in each part of
the file share the same partition would be violated, right?
4.b Is there a way to mark a partition number so that sources that share
partition numbers are read in parallel and joined? If I have 10,000 pairs, I
want partition 1 read from the sources at the same time.
4.c Does a downstream flatmap function get an open() call for each new
partition? Or, do I chain MapPartition directly to the datasource?