Error while running flink job on local environment

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Error while running flink job on local environment

Vinayak Magadum
Hi,

I am using Flink version: 1.7.1

I have a flink job that gets the execution environment as below and executes the job.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

When I run the code in cluster, it runs fine. But on local machine while running the job via IntelliJ I get below error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    <stack trace truncated >
Caused by: java.io.IOException: Insufficient number of network buffers: required 8, but only 3 available. The total number of network buffers is currently set to 12851 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:748)
   
Workaround that I tried to make it run on local is to use 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

instead of StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

With Flink 1.4.2, StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both cluster as well as local environment.

Is there any way to make StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster and local mode in flink 1.7.1? Specifically how to make it work locally via IntelliJ.
--------
Thanks & Regards,
Vinayak

Reply | Threaded
Open this post in threaded view
|

Re: Error while running flink job on local environment

Andrey Zagrebin-3
Hi Vinayak,

the error message provides a hint about changing config options, you could try to use StreamExecutionEnvironment.createLocalEnvironment(2, customConfig); to increase resources.
this issue might also address the problem, it will be part of 1.9 release:
https://issues.apache.org/jira/browse/FLINK-12852

Best,
Andrey

On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum <[hidden email]> wrote:
Hi,

I am using Flink version: 1.7.1

I have a flink job that gets the execution environment as below and executes the job.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

When I run the code in cluster, it runs fine. But on local machine while running the job via IntelliJ I get below error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    <stack trace truncated >
Caused by: java.io.IOException: Insufficient number of network buffers: required 8, but only 3 available. The total number of network buffers is currently set to 12851 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:748)
   
Workaround that I tried to make it run on local is to use 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

instead of StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

With Flink 1.4.2, StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both cluster as well as local environment.

Is there any way to make StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster and local mode in flink 1.7.1? Specifically how to make it work locally via IntelliJ.
--------
Thanks & Regards,
Vinayak

Reply | Threaded
Open this post in threaded view
|

Re: Error while running flink job on local environment

Jeff Zhang
@Andrey,

Although your approach will work, it requires the user to write different code for local mode and other modes. This is inconvenient for users. 
IMHO, we should not check these kinds of memory configuration in local mode. Or implicitly set the memory of TM pretty large in local mode to avoid this kind of problem.

Andrey Zagrebin <[hidden email]> 于2019年7月31日周三 上午1:32写道:
Hi Vinayak,

the error message provides a hint about changing config options, you could try to use StreamExecutionEnvironment.createLocalEnvironment(2, customConfig); to increase resources.
this issue might also address the problem, it will be part of 1.9 release:
https://issues.apache.org/jira/browse/FLINK-12852

Best,
Andrey

On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum <[hidden email]> wrote:
Hi,

I am using Flink version: 1.7.1

I have a flink job that gets the execution environment as below and executes the job.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

When I run the code in cluster, it runs fine. But on local machine while running the job via IntelliJ I get below error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    <stack trace truncated >
Caused by: java.io.IOException: Insufficient number of network buffers: required 8, but only 3 available. The total number of network buffers is currently set to 12851 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:748)
   
Workaround that I tried to make it run on local is to use 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

instead of StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

With Flink 1.4.2, StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both cluster as well as local environment.

Is there any way to make StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster and local mode in flink 1.7.1? Specifically how to make it work locally via IntelliJ.
--------
Thanks & Regards,
Vinayak



--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Error while running flink job on local environment

Vinayak Magadum
Hi Andrey and Jeff,

Thank you for the reply.
I agree with Jeff. My concern is to use different code for local and non-local deployments.
It would help if StreamExecutionEnvironment.getExecutionEnvironment() works for both local and cluster deployments.
--------
Thanks & Regards,
Vinayak



On Wed, Jul 31, 2019 at 7:02 AM Jeff Zhang <[hidden email]> wrote:
@Andrey,

Although your approach will work, it requires the user to write different code for local mode and other modes. This is inconvenient for users. 
IMHO, we should not check these kinds of memory configuration in local mode. Or implicitly set the memory of TM pretty large in local mode to avoid this kind of problem.

Andrey Zagrebin <[hidden email]> 于2019年7月31日周三 上午1:32写道:
Hi Vinayak,

the error message provides a hint about changing config options, you could try to use StreamExecutionEnvironment.createLocalEnvironment(2, customConfig); to increase resources.
this issue might also address the problem, it will be part of 1.9 release:
https://issues.apache.org/jira/browse/FLINK-12852

Best,
Andrey

On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum <[hidden email]> wrote:
Hi,

I am using Flink version: 1.7.1

I have a flink job that gets the execution environment as below and executes the job.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

When I run the code in cluster, it runs fine. But on local machine while running the job via IntelliJ I get below error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    <stack trace truncated >
Caused by: java.io.IOException: Insufficient number of network buffers: required 8, but only 3 available. The total number of network buffers is currently set to 12851 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:748)
   
Workaround that I tried to make it run on local is to use 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

instead of StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

With Flink 1.4.2, StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both cluster as well as local environment.

Is there any way to make StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster and local mode in flink 1.7.1? Specifically how to make it work locally via IntelliJ.
--------
Thanks & Regards,
Vinayak



--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Error while running flink job on local environment

Biao Liu
Hi Vinayak,

If `StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2)` works for your case, you could try as below.

`StreamExecutionEnvironment.setDefaultLocalParallelism(2);`
`StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();`

or 

`StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();`
`env.setParallelism(2);`

That should be equivalent from the perspective of codes.


On Wed, Jul 31, 2019 at 2:51 PM Vinayak Magadum <[hidden email]> wrote:
Hi Andrey and Jeff,

Thank you for the reply.
I agree with Jeff. My concern is to use different code for local and non-local deployments.
It would help if StreamExecutionEnvironment.getExecutionEnvironment() works for both local and cluster deployments.
--------
Thanks & Regards,
Vinayak



On Wed, Jul 31, 2019 at 7:02 AM Jeff Zhang <[hidden email]> wrote:
@Andrey,

Although your approach will work, it requires the user to write different code for local mode and other modes. This is inconvenient for users. 
IMHO, we should not check these kinds of memory configuration in local mode. Or implicitly set the memory of TM pretty large in local mode to avoid this kind of problem.

Andrey Zagrebin <[hidden email]> 于2019年7月31日周三 上午1:32写道:
Hi Vinayak,

the error message provides a hint about changing config options, you could try to use StreamExecutionEnvironment.createLocalEnvironment(2, customConfig); to increase resources.
this issue might also address the problem, it will be part of 1.9 release:
https://issues.apache.org/jira/browse/FLINK-12852

Best,
Andrey

On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum <[hidden email]> wrote:
Hi,

I am using Flink version: 1.7.1

I have a flink job that gets the execution environment as below and executes the job.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

When I run the code in cluster, it runs fine. But on local machine while running the job via IntelliJ I get below error:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    <stack trace truncated >
Caused by: java.io.IOException: Insufficient number of network buffers: required 8, but only 3 available. The total number of network buffers is currently set to 12851 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:748)
   
Workaround that I tried to make it run on local is to use 
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

instead of StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

With Flink 1.4.2, StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both cluster as well as local environment.

Is there any way to make StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster and local mode in flink 1.7.1? Specifically how to make it work locally via IntelliJ.
--------
Thanks & Regards,
Vinayak



--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Error while running flink job on local environment

Vinayak Magadum
Thank you Biao and Nico for the inputs and clarification. Good to know that setDefaultLocalParallelism() will not have any impact on cluster deployment and can be used to solve the problem on local. I will try it out.

Thanks,
Vinayak

On Thu, Aug 1, 2019, 2:22 PM Nico Kruber <[hidden email]> wrote:
Hi Vinayak,
the first example that Biao provided (using
setDefaultLocalParallelism()) is actually what you want since that
doesn't influence your job on your cluster. Your local defaults for the
parallelism (by default number of available processors) seem to be too
high for the job you are trying to run since, in contrast to the
cluster, all TMs will reside in one JVM and share this JVM's memory
including the part used for network buffers.
Flink's documentation [1] contains details on the number of buffers we
need, if you want to reason more about it.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#configuring-the-network-buffers

On 01/08/2019 06:02, Biao Liu wrote:
> Hi Vinayak,
>
> If `StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(2)` works for your
> case, you could try as below.
>
> `StreamExecutionEnvironment.setDefaultLocalParallelism(2);`
> `StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();`
>
> or 
>
> `StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();`
> `env.setParallelism(2);`
>
> That should be equivalent from the perspective of codes.
>
>
> On Wed, Jul 31, 2019 at 2:51 PM Vinayak Magadum
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi Andrey and Jeff,
>
>     Thank you for the reply.
>     I agree with Jeff. My concern is to use different code for local and
>     non-local deployments.
>     It would help if
>     StreamExecutionEnvironment.getExecutionEnvironment() works for both
>     local and cluster deployments.
>     --------
>     Thanks & Regards,
>     Vinayak
>
>
>     On Wed, Jul 31, 2019 at 7:02 AM Jeff Zhang <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         @Andrey,
>
>         Although your approach will work, it requires the user to write
>         different code for local mode and other modes. This is
>         inconvenient for users. 
>         IMHO, we should not check these kinds of memory configuration in
>         local mode. Or implicitly set the memory of TM pretty large in
>         local mode to avoid this kind of problem.
>
>         Andrey Zagrebin <[hidden email]
>         <mailto:[hidden email]>> 于2019年7月31日周三 上午1:32写道:
>
>             Hi Vinayak,
>
>             the error message provides a hint about changing config
>             options, you could try to
>             use StreamExecutionEnvironment.createLocalEnvironment(2,
>             customConfig); to increase resources.
>             this issue might also address the problem, it will be part
>             of 1.9 release:
>             https://issues.apache.org/jira/browse/FLINK-12852
>
>             Best,
>             Andrey
>
>             On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum
>             <[hidden email] <mailto:[hidden email]>>
>             wrote:
>
>                 Hi,
>
>                 I am using Flink version: 1.7.1
>
>                 I have a flink job that gets the execution environment
>                 as below and executes the job.
>
>                 StreamExecutionEnvironment env =
>                 StreamExecutionEnvironment.getExecutionEnvironment();
>
>                 When I run the code in cluster, it runs fine. But on
>                 local machine while running the job via IntelliJ I get
>                 below error:
>
>                 org.apache.flink.runtime.client.JobExecutionException:
>                 Job execution failed.
>                     at
>                 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>                     at
>                 org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>                     at
>                 org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>                     <stack trace truncated >
>                 Caused by: java.io.IOException: Insufficient number of
>                 network buffers: required 8, but only 3 available. The
>                 total number of network buffers is currently set to
>                 12851 of 32768 bytes each. You can increase this number
>                 by setting the configuration keys
>                 'taskmanager.network.memory.fraction',
>                 'taskmanager.network.memory.min', and
>                 'taskmanager.network.memory.max'.
>                     at
>                 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
>                     at
>                 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
>                     at
>                 org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
>                     at
>                 org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>                     at
>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>                     at java.lang.Thread.run(Thread.java:748)
>                    
>                 Workaround that I tried to make it run on local is to use 
>                 StreamExecutionEnvironment env =
>                 StreamExecutionEnvironment.createLocalEnvironment(2);
>
>                 instead of StreamExecutionEnvironment env =
>                 StreamExecutionEnvironment.getExecutionEnvironment();
>
>                 With Flink 1.4.2, StreamExecutionEnvironment env =
>                 StreamExecutionEnvironment.getExecutionEnvironment();
>                 used to work on both cluster as well as local environment.
>
>                 Is there any way to make
>                 StreamExecutionEnvironment.getExecutionEnvironment();
>                 work in both cluster and local mode in flink 1.7.1?
>                 Specifically how to make it work locally via IntelliJ.
>                 --------
>                 Thanks & Regards,
>                 Vinayak
>
>
>
>         --
>         Best Regards
>
>         Jeff Zhang
>

--
Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen