Great number of jobs and numberOfBuffers

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Great number of jobs and numberOfBuffers

Gwenhael Pasquiers

Hello,

 

We’re meeting a limit with the numberOfBuffers.

 

In a quite complex job we do a lot of operations, with a lot of operators, on a lot of folders (datehours).

 

In order to split the job into smaller “batches” (to limit the necessary “numberOfBuffers”) I’ve done a loop over the batches (handle the datehours 3 by 3), for each batch I create a new env then call the execute() method.

 

However it looks like there is no cleanup : after a while, if the number of batches is too big, there is an error saying that the numberOfBuffers isn’t high enough. It kinds of looks like some leak. Is there a way to clean them up ?

Reply | Threaded
Open this post in threaded view
|

Re: Great number of jobs and numberOfBuffers

Ufuk Celebi
Hey Gwenhael,

the network buffers are recycled automatically after a job terminates.
If this does not happen, it would be quite a major bug.

To help debug this:

- Which version of Flink are you using?
- Does the job fail immediately after submission or later during execution?
- Is the following correct: the batch job that eventually fails
because of missing network buffers runs without problems if you submit
it to a fresh cluster with the same memory

The network buffers are recycled after the task managers report the
task being finished. If you immediately submit the next batch there is
a slight chance that the buffers are not recycled yet. As a possible
temporary work around, could you try waiting for a short amount of
time before submitting the next batch?

I think we should also be able to run the job without splitting it up
after increasing the network memory configuration. Did you already try
this?

Best,

Ufuk


On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
<[hidden email]> wrote:

> Hello,
>
>
>
> We’re meeting a limit with the numberOfBuffers.
>
>
>
> In a quite complex job we do a lot of operations, with a lot of operators,
> on a lot of folders (datehours).
>
>
>
> In order to split the job into smaller “batches” (to limit the necessary
> “numberOfBuffers”) I’ve done a loop over the batches (handle the datehours 3
> by 3), for each batch I create a new env then call the execute() method.
>
>
>
> However it looks like there is no cleanup : after a while, if the number of
> batches is too big, there is an error saying that the numberOfBuffers isn’t
> high enough. It kinds of looks like some leak. Is there a way to clean them
> up ?
Reply | Threaded
Open this post in threaded view
|

Re: Great number of jobs and numberOfBuffers

Ufuk Celebi
PS: Also pulling in Nico (CC'd) who is working on the network stack.

On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <[hidden email]> wrote:

> Hey Gwenhael,
>
> the network buffers are recycled automatically after a job terminates.
> If this does not happen, it would be quite a major bug.
>
> To help debug this:
>
> - Which version of Flink are you using?
> - Does the job fail immediately after submission or later during execution?
> - Is the following correct: the batch job that eventually fails
> because of missing network buffers runs without problems if you submit
> it to a fresh cluster with the same memory
>
> The network buffers are recycled after the task managers report the
> task being finished. If you immediately submit the next batch there is
> a slight chance that the buffers are not recycled yet. As a possible
> temporary work around, could you try waiting for a short amount of
> time before submitting the next batch?
>
> I think we should also be able to run the job without splitting it up
> after increasing the network memory configuration. Did you already try
> this?
>
> Best,
>
> Ufuk
>
>
> On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
> <[hidden email]> wrote:
>> Hello,
>>
>>
>>
>> We’re meeting a limit with the numberOfBuffers.
>>
>>
>>
>> In a quite complex job we do a lot of operations, with a lot of operators,
>> on a lot of folders (datehours).
>>
>>
>>
>> In order to split the job into smaller “batches” (to limit the necessary
>> “numberOfBuffers”) I’ve done a loop over the batches (handle the datehours 3
>> by 3), for each batch I create a new env then call the execute() method.
>>
>>
>>
>> However it looks like there is no cleanup : after a while, if the number of
>> batches is too big, there is an error saying that the numberOfBuffers isn’t
>> high enough. It kinds of looks like some leak. Is there a way to clean them
>> up ?
Reply | Threaded
Open this post in threaded view
|

RE: Great number of jobs and numberOfBuffers

Gwenhael Pasquiers
Hello,

This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is different ?). We've been having this issue for a long time and we were careful not to schedule too many jobs.

I'm currently upgrading the application towards flink 1.2.1 and I'd like to try to solve this issue.

I'm not submitting individual jobs to a standalone cluster.

I'm starting a single application that has a loop in its main function :
for(. . .) {
        Environment env = Environment.getExectionEnvironment();
        env. . . .;
        env.execute();
}


The job fails at some point later during execution with the following error:
java.io.IOException: Insufficient number of network buffers: required 96, but only 35 available. The total number of network buffers is currently set to 36864. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'.
  at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)

Before splitting the job in multiple sub-jobs it failed right at startup.

Each "batch" job takes 10 to 30 minutes and it fails after about dozen of them (the first ones should have had enough time to be recycled).

We've already increased the jobmanager and "numberOfBuffers" values quite a bit. That way we can handle days of data, but not weeks or months. This is not very scalable. And as you say, I felt that those buffers should be recycled and that way we should have no limit as long as each batch is small enough.

If I start my command again (removing the datehours that were successfully processed) it will work since it's a fresh new cluster.

-----Original Message-----
From: Ufuk Celebi [mailto:[hidden email]]
Sent: jeudi 17 août 2017 11:24
To: Ufuk Celebi <[hidden email]>
Cc: Gwenhael Pasquiers <[hidden email]>; [hidden email]; Nico Kruber <[hidden email]>
Subject: Re: Great number of jobs and numberOfBuffers

PS: Also pulling in Nico (CC'd) who is working on the network stack.

On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <[hidden email]> wrote:

> Hey Gwenhael,
>
> the network buffers are recycled automatically after a job terminates.
> If this does not happen, it would be quite a major bug.
>
> To help debug this:
>
> - Which version of Flink are you using?
> - Does the job fail immediately after submission or later during execution?
> - Is the following correct: the batch job that eventually fails
> because of missing network buffers runs without problems if you submit
> it to a fresh cluster with the same memory
>
> The network buffers are recycled after the task managers report the
> task being finished. If you immediately submit the next batch there is
> a slight chance that the buffers are not recycled yet. As a possible
> temporary work around, could you try waiting for a short amount of
> time before submitting the next batch?
>
> I think we should also be able to run the job without splitting it up
> after increasing the network memory configuration. Did you already try
> this?
>
> Best,
>
> Ufuk
>
>
> On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
> <[hidden email]> wrote:
>> Hello,
>>
>>
>>
>> We’re meeting a limit with the numberOfBuffers.
>>
>>
>>
>> In a quite complex job we do a lot of operations, with a lot of
>> operators, on a lot of folders (datehours).
>>
>>
>>
>> In order to split the job into smaller “batches” (to limit the
>> necessary
>> “numberOfBuffers”) I’ve done a loop over the batches (handle the
>> datehours 3 by 3), for each batch I create a new env then call the execute() method.
>>
>>
>>
>> However it looks like there is no cleanup : after a while, if the
>> number of batches is too big, there is an error saying that the
>> numberOfBuffers isn’t high enough. It kinds of looks like some leak.
>> Is there a way to clean them up ?
Reply | Threaded
Open this post in threaded view
|

Re: Great number of jobs and numberOfBuffers

Nico Kruber
Hi Gwenhael,
the effect you describe sounds a bit strange. Just to clarify your setup:

1) Is the loop you were posting part of the application you run on yarn?
2) How many nodes are you running with?
3) What is the error you got when you tried to run the full program without
splitting it?
4) can you give a rough sketch of what your program is composed of (operators,
parallelism,...)?


Nico

On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> Hello,
>
> This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is
> different ?). We've been having this issue for a long time and we were
> careful not to schedule too many jobs.
 
> I'm currently upgrading the application towards flink 1.2.1 and I'd like to
> try to solve this issue.
 

> I'm not submitting individual jobs to a standalone cluster.
>
> I'm starting a single application that has a loop in its main function :
> for(. . .) {
> Environment env = Environment.getExectionEnvironment();
> env. . . .;
> env.execute();
> }
>
>
> The job fails at some point later during execution with the following
> error:
 java.io.IOException: Insufficient number of network buffers:
> required 96, but only 35 available. The total number of network buffers is
> currently set to 36864. You can increase this number by setting the
> configuration key 'taskmanager.network.numberOfBuffers'. at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPo
> ol(NetworkBufferPool.java:196)
> Before splitting the job in multiple sub-jobs it failed right at startup.
>
> Each "batch" job takes 10 to 30 minutes and it fails after about dozen of
> them (the first ones should have had enough time to be recycled).
 
> We've already increased the jobmanager and "numberOfBuffers" values quite a
> bit. That way we can handle days of data, but not weeks or months. This is
> not very scalable. And as you say, I felt that those buffers should be
> recycled and that way we should have no limit as long as each batch is
> small enough.
 
> If I start my command again (removing the datehours that were successfully
> processed) it will work since it's a fresh new cluster.
 
> -----Original Message-----
> From: Ufuk Celebi [mailto:[hidden email]]
> Sent: jeudi 17 août 2017 11:24
> To: Ufuk Celebi <[hidden email]>
> Cc: Gwenhael Pasquiers <[hidden email]>;
> [hidden email]; Nico Kruber <[hidden email]>
 Subject: Re:

> Great number of jobs and numberOfBuffers
>
> PS: Also pulling in Nico (CC'd) who is working on the network stack.
>
> On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <[hidden email]> wrote:
>
> > Hey Gwenhael,
> >
> >
> >
> > the network buffers are recycled automatically after a job terminates.
> > If this does not happen, it would be quite a major bug.
> >
> >
> >
> > To help debug this:
> >
> >
> >
> > - Which version of Flink are you using?
> > - Does the job fail immediately after submission or later during
> > execution?
 - Is the following correct: the batch job that eventually

> > fails
> > because of missing network buffers runs without problems if you submit
> > it to a fresh cluster with the same memory
> >
> >
> >
> > The network buffers are recycled after the task managers report the
> > task being finished. If you immediately submit the next batch there is
> > a slight chance that the buffers are not recycled yet. As a possible
> > temporary work around, could you try waiting for a short amount of
> > time before submitting the next batch?
> >
> >
> >
> > I think we should also be able to run the job without splitting it up
> > after increasing the network memory configuration. Did you already try
> > this?
> >
> >
> >
> > Best,
> >
> >
> >
> > Ufuk
> >
> >
> >
> >
> > On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
> > <[hidden email]> wrote:
> >
> >> Hello,
> >>
> >>
> >>
> >>
> >>
> >> We’re meeting a limit with the numberOfBuffers.
> >>
> >>
> >>
> >>
> >>
> >> In a quite complex job we do a lot of operations, with a lot of
> >> operators, on a lot of folders (datehours).
> >>
> >>
> >>
> >>
> >>
> >> In order to split the job into smaller “batches” (to limit the
> >> necessary
> >> “numberOfBuffers”) I’ve done a loop over the batches (handle the
> >> datehours 3 by 3), for each batch I create a new env then call the
> >> execute() method.
>>
> >>
> >>
> >>
> >>
> >> However it looks like there is no cleanup : after a while, if the
> >> number of batches is too big, there is an error saying that the
> >> numberOfBuffers isn’t high enough. It kinds of looks like some leak.
> >> Is there a way to clean them up ?


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Great number of jobs and numberOfBuffers

Gwenhael Pasquiers
Hi,

1/ Yes, the loop is part of the application I run on yarn. Something like :
public class MyFlinkApp {
        public static void main(String[] args){
                // parse arguments etc
                for(String datehour:datehours){
                        ExecutionEnvironment env = ExecutionEnvironment.getExectionEnvironment();
                        env.readText(datehour)
                                .union(env.readText(datehour-1))
                                .union(env.readText(datehour-2))
                                .map()
                                .groupby()
                                .sortGroup()
                                .reduceGroup()
                                .......
                       
                        // other steps, unions, processing, inputs, outputs

                        JobExecutionResult result = env.execute();

                        // read accumulators and send some statsd statistics at the end of batch
                }
        }
}

2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib per node.

3/ I remember that we had the same error (not enough buffers) right at startup. I guess that it was trying to allocate all buffers at startup as it is now doing it progressively (but still fails at the same limit)

4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 outputs (TextHadoopOutputFormat, one in the middle of the processing, the other at the end), it is composed of multiple union, flatmap, map, groupby, sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app on a whole week of data, we will have to start (24 * 7) batches. Parallelism has the default value except for the output writers (32 and 4) in order to limit the numbers of files on HDFS.



-----Original Message-----
From: Nico Kruber [mailto:[hidden email]]
Sent: vendredi 18 août 2017 14:58
To: Gwenhael Pasquiers <[hidden email]>
Cc: Ufuk Celebi <[hidden email]>; [hidden email]
Subject: Re: Great number of jobs and numberOfBuffers

Hi Gwenhael,
the effect you describe sounds a bit strange. Just to clarify your setup:

1) Is the loop you were posting part of the application you run on yarn?
2) How many nodes are you running with?
3) What is the error you got when you tried to run the full program without splitting it?
4) can you give a rough sketch of what your program is composed of (operators, parallelism,...)?


Nico

On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> Hello,
>
> This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is
> different ?). We've been having this issue for a long time and we were
> careful not to schedule too many jobs.
 
> I'm currently upgrading the application towards flink 1.2.1 and I'd
> like to try to solve this issue.
 

> I'm not submitting individual jobs to a standalone cluster.
>
> I'm starting a single application that has a loop in its main function :
> for(. . .) {
> Environment env = Environment.getExectionEnvironment();
> env. . . .;
> env.execute();
> }
>
>
> The job fails at some point later during execution with the following
> error:
 java.io.IOException: Insufficient number of network buffers:

> required 96, but only 35 available. The total number of network
> buffers is currently set to 36864. You can increase this number by
> setting the configuration key 'taskmanager.network.numberOfBuffers'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> ferPo
> ol(NetworkBufferPool.java:196)
> Before splitting the job in multiple sub-jobs it failed right at startup.
>
> Each "batch" job takes 10 to 30 minutes and it fails after about dozen
> of them (the first ones should have had enough time to be recycled).
 
> We've already increased the jobmanager and "numberOfBuffers" values
> quite a bit. That way we can handle days of data, but not weeks or
> months. This is not very scalable. And as you say, I felt that those
> buffers should be recycled and that way we should have no limit as
> long as each batch is small enough.
 
> If I start my command again (removing the datehours that were
> successfully
> processed) it will work since it's a fresh new cluster.
 
> -----Original Message-----
> From: Ufuk Celebi [mailto:[hidden email]]
> Sent: jeudi 17 août 2017 11:24
> To: Ufuk Celebi <[hidden email]>
> Cc: Gwenhael Pasquiers <[hidden email]>;
> [hidden email]; Nico Kruber <[hidden email]>
 Subject: Re:

> Great number of jobs and numberOfBuffers
>
> PS: Also pulling in Nico (CC'd) who is working on the network stack.
>
> On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <[hidden email]> wrote:
>
> > Hey Gwenhael,
> >
> >
> >
> > the network buffers are recycled automatically after a job terminates.
> > If this does not happen, it would be quite a major bug.
> >
> >
> >
> > To help debug this:
> >
> >
> >
> > - Which version of Flink are you using?
> > - Does the job fail immediately after submission or later during
> > execution?
 - Is the following correct: the batch job that eventually

> > fails
> > because of missing network buffers runs without problems if you
> > submit it to a fresh cluster with the same memory
> >
> >
> >
> > The network buffers are recycled after the task managers report the
> > task being finished. If you immediately submit the next batch there
> > is a slight chance that the buffers are not recycled yet. As a
> > possible temporary work around, could you try waiting for a short
> > amount of time before submitting the next batch?
> >
> >
> >
> > I think we should also be able to run the job without splitting it
> > up after increasing the network memory configuration. Did you
> > already try this?
> >
> >
> >
> > Best,
> >
> >
> >
> > Ufuk
> >
> >
> >
> >
> > On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
> > <[hidden email]> wrote:
> >
> >> Hello,
> >>
> >>
> >>
> >>
> >>
> >> We’re meeting a limit with the numberOfBuffers.
> >>
> >>
> >>
> >>
> >>
> >> In a quite complex job we do a lot of operations, with a lot of
> >> operators, on a lot of folders (datehours).
> >>
> >>
> >>
> >>
> >>
> >> In order to split the job into smaller “batches” (to limit the
> >> necessary
> >> “numberOfBuffers”) I’ve done a loop over the batches (handle the
> >> datehours 3 by 3), for each batch I create a new env then call the
> >> execute() method.
>>
> >>
> >>
> >>
> >>
> >> However it looks like there is no cleanup : after a while, if the
> >> number of batches is too big, there is an error saying that the
> >> numberOfBuffers isn’t high enough. It kinds of looks like some leak.
> >> Is there a way to clean them up ?

Reply | Threaded
Open this post in threaded view
|

RE: Great number of jobs and numberOfBuffers

Gwenhael Pasquiers
Hello,

Sorry to ask you again, but no idea on this ?

-----Original Message-----
From: Gwenhael Pasquiers [mailto:[hidden email]]
Sent: lundi 21 août 2017 12:04
To: Nico Kruber <[hidden email]>
Cc: Ufuk Celebi <[hidden email]>; [hidden email]
Subject: RE: Great number of jobs and numberOfBuffers

Hi,

1/ Yes, the loop is part of the application I run on yarn. Something like :
public class MyFlinkApp {
        public static void main(String[] args){
                // parse arguments etc
                for(String datehour:datehours){
                        ExecutionEnvironment env = ExecutionEnvironment.getExectionEnvironment();
                        env.readText(datehour)
                                .union(env.readText(datehour-1))
                                .union(env.readText(datehour-2))
                                .map()
                                .groupby()
                                .sortGroup()
                                .reduceGroup()
                                .......
                       
                        // other steps, unions, processing, inputs, outputs

                        JobExecutionResult result = env.execute();

                        // read accumulators and send some statsd statistics at the end of batch
                }
        }
}

2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib per node.

3/ I remember that we had the same error (not enough buffers) right at startup. I guess that it was trying to allocate all buffers at startup as it is now doing it progressively (but still fails at the same limit)

4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 outputs (TextHadoopOutputFormat, one in the middle of the processing, the other at the end), it is composed of multiple union, flatmap, map, groupby, sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app on a whole week of data, we will have to start (24 * 7) batches. Parallelism has the default value except for the output writers (32 and 4) in order to limit the numbers of files on HDFS.



-----Original Message-----
From: Nico Kruber [mailto:[hidden email]]
Sent: vendredi 18 août 2017 14:58
To: Gwenhael Pasquiers <[hidden email]>
Cc: Ufuk Celebi <[hidden email]>; [hidden email]
Subject: Re: Great number of jobs and numberOfBuffers

Hi Gwenhael,
the effect you describe sounds a bit strange. Just to clarify your setup:

1) Is the loop you were posting part of the application you run on yarn?
2) How many nodes are you running with?
3) What is the error you got when you tried to run the full program without splitting it?
4) can you give a rough sketch of what your program is composed of (operators, parallelism,...)?


Nico

On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> Hello,
>
> This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is
> different ?). We've been having this issue for a long time and we were
> careful not to schedule too many jobs.
 
> I'm currently upgrading the application towards flink 1.2.1 and I'd
> like to try to solve this issue.
 

> I'm not submitting individual jobs to a standalone cluster.
>
> I'm starting a single application that has a loop in its main function :
> for(. . .) {
> Environment env = Environment.getExectionEnvironment();
> env. . . .;
> env.execute();
> }
>
>
> The job fails at some point later during execution with the following
> error:
 java.io.IOException: Insufficient number of network buffers:

> required 96, but only 35 available. The total number of network
> buffers is currently set to 36864. You can increase this number by
> setting the configuration key 'taskmanager.network.numberOfBuffers'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> ferPo
> ol(NetworkBufferPool.java:196)
> Before splitting the job in multiple sub-jobs it failed right at startup.
>
> Each "batch" job takes 10 to 30 minutes and it fails after about dozen
> of them (the first ones should have had enough time to be recycled).
 
> We've already increased the jobmanager and "numberOfBuffers" values
> quite a bit. That way we can handle days of data, but not weeks or
> months. This is not very scalable. And as you say, I felt that those
> buffers should be recycled and that way we should have no limit as
> long as each batch is small enough.
 
> If I start my command again (removing the datehours that were
> successfully
> processed) it will work since it's a fresh new cluster.
 
> -----Original Message-----
> From: Ufuk Celebi [mailto:[hidden email]]
> Sent: jeudi 17 août 2017 11:24
> To: Ufuk Celebi <[hidden email]>
> Cc: Gwenhael Pasquiers <[hidden email]>;
> [hidden email]; Nico Kruber <[hidden email]>
 Subject: Re:

> Great number of jobs and numberOfBuffers
>
> PS: Also pulling in Nico (CC'd) who is working on the network stack.
>
> On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <[hidden email]> wrote:
>
> > Hey Gwenhael,
> >
> >
> >
> > the network buffers are recycled automatically after a job terminates.
> > If this does not happen, it would be quite a major bug.
> >
> >
> >
> > To help debug this:
> >
> >
> >
> > - Which version of Flink are you using?
> > - Does the job fail immediately after submission or later during
> > execution?
 - Is the following correct: the batch job that eventually

> > fails
> > because of missing network buffers runs without problems if you
> > submit it to a fresh cluster with the same memory
> >
> >
> >
> > The network buffers are recycled after the task managers report the
> > task being finished. If you immediately submit the next batch there
> > is a slight chance that the buffers are not recycled yet. As a
> > possible temporary work around, could you try waiting for a short
> > amount of time before submitting the next batch?
> >
> >
> >
> > I think we should also be able to run the job without splitting it
> > up after increasing the network memory configuration. Did you
> > already try this?
> >
> >
> >
> > Best,
> >
> >
> >
> > Ufuk
> >
> >
> >
> >
> > On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
> > <[hidden email]> wrote:
> >
> >> Hello,
> >>
> >>
> >>
> >>
> >>
> >> We’re meeting a limit with the numberOfBuffers.
> >>
> >>
> >>
> >>
> >>
> >> In a quite complex job we do a lot of operations, with a lot of
> >> operators, on a lot of folders (datehours).
> >>
> >>
> >>
> >>
> >>
> >> In order to split the job into smaller “batches” (to limit the
> >> necessary
> >> “numberOfBuffers”) I’ve done a loop over the batches (handle the
> >> datehours 3 by 3), for each batch I create a new env then call the
> >> execute() method.
>>
> >>
> >>
> >>
> >>
> >> However it looks like there is no cleanup : after a while, if the
> >> number of batches is too big, there is an error saying that the
> >> numberOfBuffers isn’t high enough. It kinds of looks like some leak.
> >> Is there a way to clean them up ?

Reply | Threaded
Open this post in threaded view
|

Re: Great number of jobs and numberOfBuffers

Nico Kruber
In reply to this post by Gwenhael Pasquiers
Hi Gwenhael,
First of all, we should try getting your job to work without splitting it up
as that should work. Also, the number of network buffers should not depend on
your input but rather the job and parallelism.

The IOException you reported may only come from either a) too few network
buffers in the first place or b) LocalBufferPool instances not being cleaned
up. From what I see, b) is done at task de-registration and without any
further errors in the task managers' logs (are there any?!), this should go
through. Regarding a), [1] suggests that
#slots-per-TM^2 * #TMs * 4 = 1536 (in your case)
buffers should be enough and you said, you are already using 36864.

So this brings me back to the question on why it is input-dependent.
Can you share your log files (privately if you prefer)?


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/
config.html#configuring-the-network-buffers

On Monday, 21 August 2017 12:03:46 CEST Gwenhael Pasquiers wrote:
> Hi,
>
> 1/ Yes, the loop is part of the application I run on yarn. Something like :
> public class MyFlinkApp {
> public static void main(String[] args){
> // parse arguments etc
> for(String datehour:datehours){
> ExecutionEnvironment env =
> ExecutionEnvironment.getExectionEnvironment();
 env.readText(datehour)

> .union(env.readText(datehour-1))
> .union(env.readText(datehour-2))
> .map()
> .groupby()
> .sortGroup()
> .reduceGroup()
> .......
>
> // other steps, unions, processing, inputs, outputs
>
> JobExecutionResult result = env.execute();
>
> // read accumulators and send some statsd statistics at the end of
batch
> }
> }
> }
>
> 2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib
> per node.
 
> 3/ I remember that we had the same error (not enough buffers) right at
> startup. I guess that it was trying to allocate all buffers at startup as
> it is now doing it progressively (but still fails at the same limit)
 
> 4/ The program has many steps, it has about 5 inputs (readTextFile) and 2
> outputs (TextHadoopOutputFormat, one in the middle of the processing, the
> other at the end), it is composed of multiple union, flatmap, map, groupby,
> sortGroup, reduceGroup, filter, for each "batch". And if we start the flink
> app on a whole week of data, we will have to start (24 * 7) batches.
> Parallelism has the default value except for the output writers (32 and 4)
> in order to limit the numbers of files on HDFS.
 

>
>
> -----Original Message-----
> From: Nico Kruber [mailto:[hidden email]]
> Sent: vendredi 18 août 2017 14:58
> To: Gwenhael Pasquiers <[hidden email]>
> Cc: Ufuk Celebi <[hidden email]>; [hidden email]
> Subject: Re: Great number of jobs and numberOfBuffers
>
> Hi Gwenhael,
> the effect you describe sounds a bit strange. Just to clarify your setup:
>
> 1) Is the loop you were posting part of the application you run on yarn?
> 2) How many nodes are you running with?
> 3) What is the error you got when you tried to run the full program without
> splitting it?
 4) can you give a rough sketch of what your program is

> composed of (operators, parallelism,...)?
>
> Nico
>
> On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
>
> > Hello,
> >
> > This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is
> > different ?). We've been having this issue for a long time and we were
> > careful not to schedule too many jobs.
>
>  
>
> > I'm currently upgrading the application towards flink 1.2.1 and I'd
> > like to try to solve this issue.
>
>  
>
> > I'm not submitting individual jobs to a standalone cluster.
> >
> > I'm starting a single application that has a loop in its main function :
> > for(. . .) {
> >
> > Environment env = Environment.getExectionEnvironment();
> > env. . . .;
> > env.execute();
> >
> > }
> >
> >
> > The job fails at some point later during execution with the following
> > error:
>
>  java.io.IOException: Insufficient number of network buffers:
>
> > required 96, but only 35 available. The total number of network
> > buffers is currently set to 36864. You can increase this number by
> > setting the configuration key 'taskmanager.network.numberOfBuffers'.
> > at
> > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> > ferPo
> > ol(NetworkBufferPool.java:196)
> > Before splitting the job in multiple sub-jobs it failed right at startup.
> >
> > Each "batch" job takes 10 to 30 minutes and it fails after about dozen
> > of them (the first ones should have had enough time to be recycled).
>
>  
>
> > We've already increased the jobmanager and "numberOfBuffers" values
> > quite a bit. That way we can handle days of data, but not weeks or
> > months. This is not very scalable. And as you say, I felt that those
> > buffers should be recycled and that way we should have no limit as
> > long as each batch is small enough.
>
>  
>
> > If I start my command again (removing the datehours that were
> > successfully
> > processed) it will work since it's a fresh new cluster.
>
>  
>
> > -----Original Message-----
> > From: Ufuk Celebi [mailto:[hidden email]]
> > Sent: jeudi 17 août 2017 11:24
> > To: Ufuk Celebi <[hidden email]>
> > Cc: Gwenhael Pasquiers <[hidden email]>;
> > [hidden email]; Nico Kruber <[hidden email]>
>
>  Subject: Re:
>
> > Great number of jobs and numberOfBuffers
> >
> > PS: Also pulling in Nico (CC'd) who is working on the network stack.
> >
> > On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <[hidden email]> wrote:
> >
> >
> > > Hey Gwenhael,
> > >
> > >
> > >
> > >
> > >
> > > the network buffers are recycled automatically after a job terminates.
> > > If this does not happen, it would be quite a major bug.
> > >
> > >
> > >
> > >
> > >
> > > To help debug this:
> > >
> > >
> > >
> > >
> > >
> > > - Which version of Flink are you using?
> > > - Does the job fail immediately after submission or later during
> > > execution?
>
>  - Is the following correct: the batch job that eventually
>
> > > fails
> > > because of missing network buffers runs without problems if you
> > > submit it to a fresh cluster with the same memory
> > >
> > >
> > >
> > >
> > >
> > > The network buffers are recycled after the task managers report the
> > > task being finished. If you immediately submit the next batch there
> > > is a slight chance that the buffers are not recycled yet. As a
> > > possible temporary work around, could you try waiting for a short
> > > amount of time before submitting the next batch?
> > >
> > >
> > >
> > >
> > >
> > > I think we should also be able to run the job without splitting it
> > > up after increasing the network memory configuration. Did you
> > > already try this?
> > >
> > >
> > >
> > >
> > >
> > > Best,
> > >
> > >
> > >
> > >
> > >
> > > Ufuk
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers
> > > <[hidden email]> wrote:
> > >
> > >
> > >> Hello,
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> We’re meeting a limit with the numberOfBuffers.
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> In a quite complex job we do a lot of operations, with a lot of
> > >> operators, on a lot of folders (datehours).
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> In order to split the job into smaller “batches” (to limit the
> > >> necessary
> > >> “numberOfBuffers”) I’ve done a loop over the batches (handle the
> > >> datehours 3 by 3), for each batch I create a new env then call the
> > >> execute() method.
> >>
> >>
> >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> However it looks like there is no cleanup : after a while, if the
> > >> number of batches is too big, there is an error saying that the
> > >> numberOfBuffers isn’t high enough. It kinds of looks like some leak.
> > >> Is there a way to clean them up ?
>
>


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Great number of jobs and numberOfBuffers

Gwenhael Pasquiers
Hi,

Well yes, I could probably make it work with a constant number of operators (and consequently buffers) by developing specific input and output classes, and that way I'd have a workaround for that buffers issue.

The size of my job is input-dependent mostly because my code creates one full processing chain per input folder (there is one folder per hour of data) and that processing chain has many functions. The number of input folders is an argument to the software and can vary from 1 to hundreds (when we re-compute something like a month of data).

for(String folder:folders){
        env = Environment.getExecutionEnvironment();
        env.readText(folder).[......].writeText(folder + "_processed");
        env.execute();
}

There is one full processing chain per folder (a loop is creating them) because the name of the output is the same as the name of the input, and I did not want to create a specific "rolling-output" with bucketers.

So, yes, I could develop a source that supports a list of folders and puts the source name into the produced data. My processing could also handle tuples where one field is the source folder and use it as a key where appropriate, and finally yes I could also create a sink that will "dispatch" the datasets to the correct folder according to that field of the tuple.

But at first that seemed too complicated (premature optimization) so I coded my job the "naïve" way then splitted it because I thought that there would be some sort of recycling of those buffers between jobs.

If the recycling works in normal conditions maybe the question is whether it also works when multiple jobs are ran from within the same jar VS running the jar multiple times ?




PS: I don't have log files at hand, the app is ran on a separate and secured platform. Sure, I could try to reproduce the issue with a mock app but I'm not sure it would help the discussion.