Hi all, I have just hit a problem, stack trace at the bottom.http://flink.incubator.apache.org/docs/0.6-incubating/faq.html Noticeably, 2 nodes run just 2 tasks... so the equation changes a bit. Is it possible that this is causing problems? furthermore, the tasks are running where hbase and solr are running as well. So, the number of threads is quite relevant. thanks a lot for the support! :-) saluti, Stefano okkam-nano-2.okkam.it Error: java.lang.Exception: Failed to deploy the task CHAIN Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction) -> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction) (15/28) - execution #0 to slot SubSlot 5 (cab978f80c0cb7071136cd755e971be9 (5) - ALLOCATED/ALIVE): org.apache.flink.runtime.io.network.InsufficientResourcesException: okkam-nano-2.okkam.it has not enough buffers to safely execute CHAIN Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction) -> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction) (36 buffers missing) at org.apache.flink.runtime.io.network.ChannelManager.ensureBufferAvailability(ChannelManager.java:262) at org.apache.flink.runtime.io.network.ChannelManager.register(ChannelManager.java:130) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:598) at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) at org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) |
Hey Stefano, your number of task slots per task manager is 6 right, e.g. 6 * 6 = 36 slots in total? You can check the total number of available task slots in the job manager web interface.And from the log output: are you running all tasks with parallelism of 28? On Tue, Dec 2, 2014 at 11:21 AM, Stefano Bortoli <[hidden email]> wrote:
|
Thanks Ufuk, the number of slots in total is 28. as showed in the log. I have increased the number to 4096, and the process is running fine. Here is my program:public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment(); MyTableInputFormat inputFormat = new MyTableInputFormat(); DataSet<Tuple3<String, String, Long>> dataset = env.createInput(inputFormat).rebalance(); DataSet<Tuple4<String, String, String, Boolean>> grouped = dataset.flatMap(new FindCandidateWithMatchFlagMapFunction<>()).groupBy(0,1).reduceGroup(new RemoveDuplicateReduceGroupFunction()).distinct(); DataSet<Tuple6<String, String, String, Boolean, String, String>> joined = grouped. join(dataset).where(0).equalTo(0).with(new Join1ToGetCandidates()).join(dataset).where(1).equalTo(0) .with(new Join2ToGetCandidates()); DataSet<Tuple3<String, String, String>> duplicates =joined.filter(new SingleMatchFilterFunctionWithFlagMatch<>()).map(new MapToTuple3MapFunction<>()); DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint = duplicates.distinct().groupBy(0).reduceGroup(new ConsolidateByTypeDuplicatesGroupReduceFunction()); duplicatesToprint.writeAsText("file:///tmp/"+EnsMaintenanceConstants.WORKING_TABLE+"/duplicates.txt", WriteMode.OVERWRITE).setParallelism(1); // System.out.println(env.getExecutionPlan()); env.execute(); } saluti, Stefano 2014-12-02 12:17 GMT+01:00 Ufuk Celebi <[hidden email]>:
|
Free forum by Nabble | Edit this page |