flink-storm FlinkLocalCluster issue

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink-storm FlinkLocalCluster issue

#ZHANG SHUHAO#

Hi everyone,

 

I’m a student researcher working on Flink recently.

 

I’m trying out the flink-storm example project, version 0.10.2, flink-storm-examples, word-count-local.

 

But, I got the following error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) - [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)

                at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)

                at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)

                at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)

                at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)

                at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)

                at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)

                at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

                at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

I notice that by default, task manager has only one slot, changing the setting in flink-conf does not help as I want to debug locally through FlinkLocalCluster (not to submit it locally).

 

I have try the following:

 

Import backtype.storm.Config;
 
 

Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);

 

 

But it’s not working.

 

 

Is there any way to work around?

 

Many thanks.

 

shuhao zhang (Tony).

Reply | Threaded
Open this post in threaded view
|

Re: flink-storm FlinkLocalCluster issue

Till Rohrmann

Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility layer and not Flink itself. When you run your job locally, the LocalFlinkMiniCluster should be started with as many slots as your maximum degree of parallelism is in your topology. You can check this in FlinkLocalCluster.java:96. When you submit your job to a remote cluster, then you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till


On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# <[hidden email]> wrote:

Hi everyone,

 

I’m a student researcher working on Flink recently.

 

I’m trying out the flink-storm example project, version 0.10.2, flink-storm-examples, word-count-local.

 

But, I got the following error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) - [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)

                at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)

                at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)

                at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)

                at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)

                at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)

                at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)

                at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

                at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

I notice that by default, task manager has only one slot, changing the setting in flink-conf does not help as I want to debug locally through FlinkLocalCluster (not to submit it locally).

 

I have try the following:

 

Import backtype.storm.Config;
 
 

Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);

 

 

But it’s not working.

 

 

Is there any way to work around?

 

Many thanks.

 

shuhao zhang (Tony).


Reply | Threaded
Open this post in threaded view
|

Re: flink-storm FlinkLocalCluster issue

#ZHANG SHUHAO#
Hi till, 

Thanks for your reply.
But it appears that it only started with #slot of 1.
I have traced down to the source code of flink step by step, where I have confirmed it.

I'm using flink 0.10.2, source code downloaded from flink website. Nothing have been changed. I simply try to run the flink-Storm word count local example. 

It just failed to work.


Sent from my iPhone

On 26 Feb 2016, at 6:16 PM, Till Rohrmann <[hidden email]> wrote:

Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility layer and not Flink itself. When you run your job locally, the LocalFlinkMiniCluster should be started with as many slots as your maximum degree of parallelism is in your topology. You can check this in FlinkLocalCluster.java:96. When you submit your job to a remote cluster, then you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till


On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# <[hidden email]> wrote:

Hi everyone,

 

I’m a student researcher working on Flink recently.

 

I’m trying out the flink-storm example project, version 0.10.2, flink-storm-examples, word-count-local.

 

But, I got the following error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) - [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)

                at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)

                at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)

                at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)

                at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)

                at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)

                at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)

                at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

                at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

I notice that by default, task manager has only one slot, changing the setting in flink-conf does not help as I want to debug locally through FlinkLocalCluster (not to submit it locally).

 

I have try the following:

 

Import backtype.storm.Config;
 
 

Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);

 

 

But it’s not working.

 

 

Is there any way to work around?

 

Many thanks.

 

shuhao zhang (Tony).


Reply | Threaded
Open this post in threaded view
|

Re: flink-storm FlinkLocalCluster issue

Stephan Ewen
Hi!

On 0.10.x, the Storm compatibility layer does not properly configure the Local Flink Executor to have the right parallelism.

In 1.0 that is fixed. If you try the latest snapshot, or the 1.0-Release-Candidate-1, it should work.

Greetings,
Stephan


On Fri, Feb 26, 2016 at 12:16 PM, #ZHANG SHUHAO# <[hidden email]> wrote:
Hi till, 

Thanks for your reply.
But it appears that it only started with #slot of 1.
I have traced down to the source code of flink step by step, where I have confirmed it.

I'm using flink 0.10.2, source code downloaded from flink website. Nothing have been changed. I simply try to run the flink-Storm word count local example. 

It just failed to work.


Sent from my iPhone

On 26 Feb 2016, at 6:16 PM, Till Rohrmann <[hidden email]> wrote:

Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility layer and not Flink itself. When you run your job locally, the LocalFlinkMiniCluster should be started with as many slots as your maximum degree of parallelism is in your topology. You can check this in FlinkLocalCluster.java:96. When you submit your job to a remote cluster, then you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till


On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# <[hidden email]> wrote:

Hi everyone,

 

I’m a student researcher working on Flink recently.

 

I’m trying out the flink-storm example project, version 0.10.2, flink-storm-examples, word-count-local.

 

But, I got the following error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) - [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)

                at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)

                at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)

                at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)

                at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)

                at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)

                at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)

                at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

                at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

I notice that by default, task manager has only one slot, changing the setting in flink-conf does not help as I want to debug locally through FlinkLocalCluster (not to submit it locally).

 

I have try the following:

 

Import backtype.storm.Config;
 
 

Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);

 

 

But it’s not working.

 

 

Is there any way to work around?

 

Many thanks.

 

shuhao zhang (Tony).



Reply | Threaded
Open this post in threaded view
|

RE: flink-storm FlinkLocalCluster issue

#ZHANG SHUHAO#

Thanks for the confirmation.

When will 1.0 be ready in maven repo?

 

From: [hidden email] [mailto:[hidden email]] On Behalf Of Stephan Ewen
Sent: Friday, February 26, 2016 9:07 PM
To: [hidden email]
Subject: Re: flink-storm FlinkLocalCluster issue

 

Hi!

 

On 0.10.x, the Storm compatibility layer does not properly configure the Local Flink Executor to have the right parallelism.

 

In 1.0 that is fixed. If you try the latest snapshot, or the 1.0-Release-Candidate-1, it should work.

 

Greetings,

Stephan

 

 

On Fri, Feb 26, 2016 at 12:16 PM, #ZHANG SHUHAO# <[hidden email]> wrote:

Hi till, 

 

Thanks for your reply.

But it appears that it only started with #slot of 1.

I have traced down to the source code of flink step by step, where I have confirmed it.

 

I'm using flink 0.10.2, source code downloaded from flink website. Nothing have been changed. I simply try to run the flink-Storm word count local example. 

 

It just failed to work.

 

 

Sent from my iPhone


On 26 Feb 2016, at 6:16 PM, Till Rohrmann <[hidden email]> wrote:

Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility layer and not Flink itself. When you run your job locally, the LocalFlinkMiniCluster should be started with as many slots as your maximum degree of parallelism is in your topology. You can check this in FlinkLocalCluster.java:96. When you submit your job to a remote cluster, then you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till

 

On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# <[hidden email]> wrote:

Hi everyone,

 

I’m a student researcher working on Flink recently.

 

I’m trying out the flink-storm example project, version 0.10.2, flink-storm-examples, word-count-local.

 

But, I got the following error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) - [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)

                at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)

                at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)

                at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)

                at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)

                at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

                at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)

                at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)

                at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)

                at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)

                at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

                at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

I notice that by default, task manager has only one slot, changing the setting in flink-conf does not help as I want to debug locally through FlinkLocalCluster (not to submit it locally).

 

I have try the following:

 

Import backtype.storm.Config;
 
 

Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);

 

 

But it’s not working.

 

 

Is there any way to work around?

 

Many thanks.

 

shuhao zhang (Tony).

 

 

Reply | Threaded
Open this post in threaded view
|

Re: flink-storm FlinkLocalCluster issue

Maximilian Michels
Hi Zhang,

Please have a look here for the 1.0.0-rc2:

Binaries: http://people.apache.org/~rmetzger/flink-1.0.0-rc2/
Maven repository:
https://repository.apache.org/content/repositories/orgapacheflink-1064

Cheers,
Max

On Sat, Feb 27, 2016 at 4:00 AM, #ZHANG SHUHAO# <[hidden email]> wrote:

> Thanks for the confirmation.
>
> When will 1.0 be ready in maven repo?
>
>
>
> From: [hidden email] [mailto:[hidden email]] On Behalf Of
> Stephan Ewen
> Sent: Friday, February 26, 2016 9:07 PM
> To: [hidden email]
> Subject: Re: flink-storm FlinkLocalCluster issue
>
>
>
> Hi!
>
>
>
> On 0.10.x, the Storm compatibility layer does not properly configure the
> Local Flink Executor to have the right parallelism.
>
>
>
> In 1.0 that is fixed. If you try the latest snapshot, or the
> 1.0-Release-Candidate-1, it should work.
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Fri, Feb 26, 2016 at 12:16 PM, #ZHANG SHUHAO# <[hidden email]>
> wrote:
>
> Hi till,
>
>
>
> Thanks for your reply.
>
> But it appears that it only started with #slot of 1.
>
> I have traced down to the source code of flink step by step, where I have
> confirmed it.
>
>
>
> I'm using flink 0.10.2, source code downloaded from flink website. Nothing
> have been changed. I simply try to run the flink-Storm word count local
> example.
>
>
>
> It just failed to work.
>
>
>
>
>
> Sent from my iPhone
>
>
> On 26 Feb 2016, at 6:16 PM, Till Rohrmann <[hidden email]> wrote:
>
> Hi Shuhao,
>
> the configuration you’re providing is only used for the storm compatibility
> layer and not Flink itself. When you run your job locally, the
> LocalFlinkMiniCluster should be started with as many slots as your maximum
> degree of parallelism is in your topology. You can check this in
> FlinkLocalCluster.java:96. When you submit your job to a remote cluster,
> then you have to define the number of slots in the flink-conf.yaml file.
>
> Cheers,
> Till
>
>
>
> On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# <[hidden email]>
> wrote:
>
> Hi everyone,
>
>
>
> I’m a student researcher working on Flink recently.
>
>
>
> I’m trying out the flink-storm example project, version 0.10.2,
> flink-storm-examples, word-count-local.
>
>
>
> But, I got the following error:
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @
> (unassigned) - [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72
>> in sharing group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b,
> b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >.
> Resources available to scheduler: Number of instances=1, total number of
> slots=1, available slots=0
>
>                 at
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)
>
>                 at
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>
>                 at
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>
>                 at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>
>                 at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>
>                 at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>
>                 at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
> I notice that by default, task manager has only one slot, changing the
> setting in flink-conf does not help as I want to debug locally through
> FlinkLocalCluster (not to submit it locally).
>
>
>
> I have try the following:
>
>
>
> Import backtype.storm.Config;
>
>
>
>
>
> Config config = new Config();
> config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
> cluster.submitTopology(topologyId, config, ft);
>
>
>
>
>
> But it’s not working.
>
>
>
>
>
> Is there any way to work around?
>
>
>
> Many thanks.
>
>
>
> shuhao zhang (Tony).
>
>
>
>