Re:

Posted by Robert Waury on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/no-subject-tp165p167.html

Yes, I'm running 0.6.1

Setting DOP manually worked, thanks.

Computation time is now down to around a 100 seconds.

Is there a way to let Flink figure out the DOP automatically within a Yarn application or do I always have to set it manually?

Cheers,
Robert



On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or (preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <[hidden email]> wrote:
Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on the cluster (18 containers with 4GB and 8 cores each, input from HDFS with rf=5).

From the Flink log it seemed all data was shuffled to a single machine even for FlatMap operations.

log excerpt:
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance <a href="tel:2147483647" value="+12147483647" target="_blank">2147483647)
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert