Hi, I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.log excerpt: The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.10:54:08,832 INFO org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647) 10:54:08,832 INFO org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5 10:54:09,589 INFO org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647) 10:54:09,590 INFO org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128 Any idea what causes this? Cheers, Robert |
Hi! It looks like the job is running with a DOP of one. Can you set the DOP higher? Either directly on the ExecutionEnvironment, or (preferably) through the "-p" parameter on the command line. You are using 0.6, is that correct? (Looks like it from the logs) Stephan On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <[hidden email]> wrote:
|
Yes, I'm running 0.6.1 Setting DOP manually worked, thanks. On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <[hidden email]> wrote:
|
In the 0.6.1 release not, no. With the upcoming 0.7-incubating release, you can set the number of task slots per Container (-s flag) and this value will be used automatically as the default DOP. On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <[hidden email]> wrote:
|
In reply to this post by Robert Waury
There is a ticket open for that, to configure the default DOP based on the number of containers and slots. It is not implemented, yet, though. On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <[hidden email]> wrote:
|
Are you referring to https://issues.apache.org/jira/browse/FLINK-968? So as I said, users can pass the the "-s" parameter to set the number of slots per container and the number is being used by the CliFrontned. So how slots are being used is basically not documented, however a very important concept to properly configure and run Flink. ( --> https://issues.apache.org/jira/browse/FLINK-1157) On Mon, Oct 13, 2014 at 2:13 PM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |