(no subject)

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

(no subject)

Robert Waury
Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on the cluster (18 containers with 4GB and 8 cores each, input from HDFS with rf=5).

From the Flink log it seemed all data was shuffled to a single machine even for FlatMap operations.

log excerpt:
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert
Reply | Threaded
Open this post in threaded view
|

Re:

Stephan Ewen
Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or (preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <[hidden email]> wrote:
Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on the cluster (18 containers with 4GB and 8 cores each, input from HDFS with rf=5).

From the Flink log it seemed all data was shuffled to a single machine even for FlatMap operations.

log excerpt:
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance <a href="tel:2147483647" value="+12147483647" target="_blank">2147483647)
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert

Reply | Threaded
Open this post in threaded view
|

Re:

Robert Waury
Yes, I'm running 0.6.1

Setting DOP manually worked, thanks.

Computation time is now down to around a 100 seconds.

Is there a way to let Flink figure out the DOP automatically within a Yarn application or do I always have to set it manually?

Cheers,
Robert



On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or (preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <[hidden email]> wrote:
Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on the cluster (18 containers with 4GB and 8 cores each, input from HDFS with rf=5).

From the Flink log it seemed all data was shuffled to a single machine even for FlatMap operations.

log excerpt:
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance <a href="tel:2147483647" value="+12147483647" target="_blank">2147483647)
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert


Reply | Threaded
Open this post in threaded view
|

Re:

rmetzger0
In the 0.6.1 release not, no.
With the upcoming 0.7-incubating release, you can set the number of task slots per Container (-s flag) and this value will be used automatically as the default DOP.

On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <[hidden email]> wrote:
Yes, I'm running 0.6.1

Setting DOP manually worked, thanks.

Computation time is now down to around a 100 seconds.

Is there a way to let Flink figure out the DOP automatically within a Yarn application or do I always have to set it manually?

Cheers,
Robert



On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or (preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <[hidden email]> wrote:
Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on the cluster (18 containers with 4GB and 8 cores each, input from HDFS with rf=5).

From the Flink log it seemed all data was shuffled to a single machine even for FlatMap operations.

log excerpt:
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance <a href="tel:2147483647" value="+12147483647" target="_blank">2147483647)
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert



Reply | Threaded
Open this post in threaded view
|

Re:

Stephan Ewen
In reply to this post by Robert Waury
There is a ticket open for that, to configure the default DOP based on the number of containers and slots. It is not implemented, yet, though.



On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <[hidden email]> wrote:
Yes, I'm running 0.6.1

Setting DOP manually worked, thanks.

Computation time is now down to around a 100 seconds.

Is there a way to let Flink figure out the DOP automatically within a Yarn application or do I always have to set it manually?

Cheers,
Robert



On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or (preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <[hidden email]> wrote:
Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on the cluster (18 containers with 4GB and 8 cores each, input from HDFS with rf=5).

From the Flink log it seemed all data was shuffled to a single machine even for FlatMap operations.

log excerpt:
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance <a href="tel:2147483647" value="+12147483647" target="_blank">2147483647)
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert



Reply | Threaded
Open this post in threaded view
|

Re:

rmetzger0
Are you referring to https://issues.apache.org/jira/browse/FLINK-968? So as I said, users can pass the the "-s" parameter to set the number of slots per container and the number is being used by the CliFrontned.


I just found out that only the YARN cluster setup page mentions slots at all.
So how slots are being used is basically not documented, however a very important concept to properly configure and run Flink. ( --> https://issues.apache.org/jira/browse/FLINK-1157)

On Mon, Oct 13, 2014 at 2:13 PM, Stephan Ewen <[hidden email]> wrote:
There is a ticket open for that, to configure the default DOP based on the number of containers and slots. It is not implemented, yet, though.



On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <[hidden email]> wrote:
Yes, I'm running 0.6.1

Setting DOP manually worked, thanks.

Computation time is now down to around a 100 seconds.

Is there a way to let Flink figure out the DOP automatically within a Yarn application or do I always have to set it manually?

Cheers,
Robert



On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or (preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <[hidden email]> wrote:
Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop 2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on the cluster (18 containers with 4GB and 8 cores each, input from HDFS with rf=5).

From the Flink log it seemed all data was shuffled to a single machine even for FlatMap operations.

log excerpt:
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance <a href="tel:2147483647" value="+12147483647" target="_blank">2147483647)
10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and converting them with a FlatMap (selectivity is below 1%) it joins them each twice with a small data set (< 1MB) after that the join results are joined with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert