Batch jobs with a very large number of input splits

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Batch jobs with a very large number of input splits

Niels Basjes
Hi,

I'm working on a batch process using Flink and I ran into an interesting problem.
The number of input splits in my job is really really large.

I currently have a HBase input (with more than 1000 regions) and in the past I have worked with MapReduce jobs doing 2000+ files.

The problem I have is that if I run such a job in a "small" yarn-session (i.e. less than 1000 tasks) I get a fatal error indicating that there are not enough resources.
For a continuous streaming job this makes sense, yet for a batch job (like I'm having) this is an undesirable error.

For my HBase situation I currently have a workaround by overriding the creatInputSplits method from the TableInputFormat and thus control the input splits that are created.

What is the correct way to solve this (no my cluster is NOT big enough to run that many parallel tasks) ?


--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Batch jobs with a very large number of input splits

rmetzger0
Hi Niels,

In Flink, you don't need one task per file, since splits are assigned lazily to reading tasks.
What exactly is the error you are getting when trying to read that many input splits? (Is it on the JobManager?)

Regards,
Robert

On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes <[hidden email]> wrote:
Hi,

I'm working on a batch process using Flink and I ran into an interesting problem.
The number of input splits in my job is really really large.

I currently have a HBase input (with more than 1000 regions) and in the past I have worked with MapReduce jobs doing 2000+ files.

The problem I have is that if I run such a job in a "small" yarn-session (i.e. less than 1000 tasks) I get a fatal error indicating that there are not enough resources.
For a continuous streaming job this makes sense, yet for a batch job (like I'm having) this is an undesirable error.

For my HBase situation I currently have a workaround by overriding the creatInputSplits method from the TableInputFormat and thus control the input splits that are created.

What is the correct way to solve this (no my cluster is NOT big enough to run that many parallel tasks) ?


--
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply | Threaded
Open this post in threaded view
|

Re: Batch jobs with a very large number of input splits

Niels Basjes
I did more digging and finally understand what goes wrong.
I create a yarn-session with 50 slots.
Then I run my job that (due to the fact that my HBase table has 100s of regions) has a lot of inputsplits.
The job then runs with parallelism 50 because I did not specify the value.
As a consequence the second job I start in the same yarn-session is faced with 0 available task slots and fails with this exception:

08/23/2016 09:58:52 Job execution switched to status FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: ...... Resources available to scheduler: Number of instances=5, total number of slots=50, available slots=0

So my conclusion for now is that if you want to run batch jobs in yarn-session then you MUST specify the parallelism for all steps or otherwise it will fill the yarn-session completely and you cannot run multiple jobs in parallel.

Is this conclusion correct?

Niels Basjes


On Fri, Aug 19, 2016 at 3:18 PM, Robert Metzger <[hidden email]> wrote:
Hi Niels,

In Flink, you don't need one task per file, since splits are assigned lazily to reading tasks.
What exactly is the error you are getting when trying to read that many input splits? (Is it on the JobManager?)

Regards,
Robert

On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes <[hidden email]> wrote:
Hi,

I'm working on a batch process using Flink and I ran into an interesting problem.
The number of input splits in my job is really really large.

I currently have a HBase input (with more than 1000 regions) and in the past I have worked with MapReduce jobs doing 2000+ files.

The problem I have is that if I run such a job in a "small" yarn-session (i.e. less than 1000 tasks) I get a fatal error indicating that there are not enough resources.
For a continuous streaming job this makes sense, yet for a batch job (like I'm having) this is an undesirable error.

For my HBase situation I currently have a workaround by overriding the creatInputSplits method from the TableInputFormat and thus control the input splits that are created.

What is the correct way to solve this (no my cluster is NOT big enough to run that many parallel tasks) ?


--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Batch jobs with a very large number of input splits

Fabian Hueske-2
Hi Niels,

yes, in YARN mode, the default parallelism is the number of available slots.

You can change the default task parallelism like this:

1) Use the -p parameter when submitting a job via the CLI client [1]
2) Set a parallelism on the execution environment: env.setParallelism()

Best, Fabian

2016-08-23 10:29 GMT+02:00 Niels Basjes <[hidden email]>:
I did more digging and finally understand what goes wrong.
I create a yarn-session with 50 slots.
Then I run my job that (due to the fact that my HBase table has 100s of regions) has a lot of inputsplits.
The job then runs with parallelism 50 because I did not specify the value.
As a consequence the second job I start in the same yarn-session is faced with 0 available task slots and fails with this exception:

08/23/2016 09:58:52 Job execution switched to status FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: ...... Resources available to scheduler: Number of instances=5, total number of slots=50, available slots=0

So my conclusion for now is that if you want to run batch jobs in yarn-session then you MUST specify the parallelism for all steps or otherwise it will fill the yarn-session completely and you cannot run multiple jobs in parallel.

Is this conclusion correct?

Niels Basjes


On Fri, Aug 19, 2016 at 3:18 PM, Robert Metzger <[hidden email]> wrote:
Hi Niels,

In Flink, you don't need one task per file, since splits are assigned lazily to reading tasks.
What exactly is the error you are getting when trying to read that many input splits? (Is it on the JobManager?)

Regards,
Robert

On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes <[hidden email]> wrote:
Hi,

I'm working on a batch process using Flink and I ran into an interesting problem.
The number of input splits in my job is really really large.

I currently have a HBase input (with more than 1000 regions) and in the past I have worked with MapReduce jobs doing 2000+ files.

The problem I have is that if I run such a job in a "small" yarn-session (i.e. less than 1000 tasks) I get a fatal error indicating that there are not enough resources.
For a continuous streaming job this makes sense, yet for a batch job (like I'm having) this is an undesirable error.

For my HBase situation I currently have a workaround by overriding the creatInputSplits method from the TableInputFormat and thus control the input splits that are created.

What is the correct way to solve this (no my cluster is NOT big enough to run that many parallel tasks) ?


--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes