Hello -
I have a large number of pairs of files. For purpose of discussion: /source1/{1..10000} and /source2/{1..10000}. I want to join the files pair-wise: /source1/1 joined to /source2/1, /source1/2 joined to /source2/2, and so on. I then want to union the results of the pair-wise joins and perform an aggregate. I create a simple flink job that has four sources, two joins, and two sinks to produce intermediate results. This represents two unrelated chains. I notice that when running this job with parallelism = 1 on a standalone machine with one task manager and 3 slots, only one slot gets used. My concern is that when I scale up to a YARN cluster, flink will continue to use one slot on one machine instead of using all slots on all machines. Prior reading suggests all the data source subtasks are added to a default resource group. Downstream tasks (joins and sinks) want to be colocated with the data sources. The result is all of my tasks are executed in one slot. Flink Stream (DataStream) offers the slotSharingGroup() function. This doesn't seem available to the DataSet user. *Q1:* How do I force Flink to distribute work evenly across task managers and the slots allocated to them? If this shouldn't be a concern, please elaborate. When I scale up the number of unrelated chains I notice that flink seems to start all of them at the same time, which results in thrashing and errors - lots of IO and errors regarding hash buffers. *Q2:* Is there any method for controlling the scheduling of tasks so that some finish before others start? My work around is to execute multiple, sequential batches with results going into an intermediate directory, and then a final job that aggregates the results. I would certainly prefer one job that might avoid the intermediate write. If I treat /source1 as one data source and /source2 as the second, and then join the two, flink will shuffle and partition the files on the join key. The /source1 and /source2 files represent this partitioning. They are reused multiple times; thus, I shuffle and save the results creating /source1 and /source2. *Q3:* Does flink have a method by which I can mark individual files (or directories) as belonging to a particular partition so that when I try to join them, the unnecessary shuffle and repartition is avoided? Thank you, David -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi David, Flink's DataSet API schedules one slice of a program to a task slot. A program slice is one parallel instance of each operator of a program. When all operator of your program run with a parallelism of 1, you end up with only 1 slice that runs on a single slot. However, you would lose the advantage of pre-partitioned files because all data of source1 would be joined with all data of source2. There is a low-level interface to leverage pre-partitioned files. With SplitDataProperties [3] you can specify that the data produced by a DataSource [4] is partitioned by InputSplit. If you implement the source in a way that a single split contains the information to read both files, you can avoid an additional shuffle and join locally. This is manual low-level optimization where you need to know what you are doing. I'm not sure if this is documented except for the Java Docs. Hope this helps, Fabian [3] https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java#L101 [4] https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java#L117 2017-10-26 4:13 GMT+02:00 David Dreyfus <[hidden email]>: Hello - |
Hi Fabian,
Thank you for the great, detailed answers. 1. So, each parallel slice of the DAG is placed into one slot. The key to high utilization is many slices of the source data (or the various methods of repartitioning it). Yes? 2. In batch processing, are slots filled round-robin on task managers, or do I need to tune the number of slots to load the cluster evenly? 3. Are you suggesting that I perform the join in my custom data source? 4. Looking at this sample from org.apache.flink.optimizer.PropertyDataSourceTest DataSource<Tuple2<Long, String>> data = env.readCsvFile("/some/path").types(Long.class, String.class); data.getSplitDataProperties() .splitsPartitionedBy(0); 4.a Does this code assume that one split == one file from /some/path? If readCsvFile splits each file, the guarantee that all keys in each part of the file share the same partition would be violated, right? 4.b Is there a way to mark a partition number so that sources that share partition numbers are read in parallel and joined? If I have 10,000 pairs, I want partition 1 read from the sources at the same time. 4.c Does a downstream flatmap function get an open() call for each new partition? Or, do I chain MapPartition directly to the datasource? Thank you, David -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi David, please find my answers below:1. For high utilization, all slot should be filled. Each slot will processes a slice of the program on a slice of the data. In case of partitioning or changed parallelism, the data is shuffled accordingly . 2. That's a good question. I think the default logic is to go round-robin on the TMs as you suggested, but I'm not 100% sure. There are a couple of exceptions and special cases, IIRC. 3. No, I would still use Flink's join operator to do the join. When you read both files with the same split, you'd have a single source for both input. You could do something like: /-- Filter "/source1" --\ Source -< >-Join--> \-- Filter "/source2" --/ If all operators have the same parallelism and the source has the right split properties configured, all data should stay local and the join would work without partitioning the data. You could go even further if the data in the files is sorted on the join key. Then you could read in zig-zag fashion from both files and give sorted split properties. In theory, the join would happen without a sort (haven't tried this though). 4.a Yes that is true. FileInputFormat has a flag to prevent files from being split up into multiple splits. 4.b You might be able to hack this with a custom InputSplitAssigner. The SplitDataProperties partition methods have a partitioniner ID field. IIRC, this is used to determine equal partitioning for joins. However, as I said, you need to make sure that the files with the same keys are read by the same subtask. You could do that with a custom InputSplitAssigner. My proposal to read both files with the same key in the same input split (with a single source) tried to go around this issue by forcing the data of both files in the same subtask. 4.c. The concept of a partition is a bit different in Flink and not bound to InputSplits. All data arriving at a parallel instance of an operator is considered to be in the same partition. So both, FlatMap and MapPartition, call open() just once. In MapPartition the mapPartition() method is also called once, while flatMap() is called for each record. Cheers, Fabian 2017-10-26 15:04 GMT+02:00 David Dreyfus <[hidden email]>: Hi Fabian, |
Free forum by Nabble | Edit this page |