Hi, I'm having some trouble running a java based Flink job in a yarn-session. The job itself consists of reading a set of files resulting in a DataStream (I use DataStream because in the future I intend to change the file with a Kafka feed), then does some parsing and eventually writes the data into HBase. Most of the time running this works fine yet sometimes it fails with this exception: org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363 not found. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345) at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286) at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123) at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118) at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) at akka.dispatch.OnComplete.internal(Future.scala:248) at akka.dispatch.OnComplete.internal(Future.scala:245) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I went through all logs at the Hadoop side of all the related containers and other than this exception I did not see any warning/error that might explain what is going on here. Now the "Most of the time running this works fine" makes this hard to troubleshoot. When I run the same job again it may run perfectly that time. I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my pom.xml and I use the same version for Flink / Scala in there. The command used to start the yarn-session on my experimental cluster (no security, no other users): /usr/local/flink-1.3.2/bin/yarn-session.sh \ Two relevant fragments from my application pom.xml: <flink.version>1.3.2</flink.version>
I could really use some suggestions where to look for the root cause of this. Is this something in my application? My Hadoop cluster? Or is this a problem in Flink 1.3.2? Thanks. Best regards / Met vriendelijke groeten,
Niels Basjes |
Hey Niels,
thanks for the detailed report. I don't think that it is related to the Hadoop or Scala version. I think the following happens: - Occasionally, one of your tasks seems to be extremely slow in registering its produced intermediate result (the data shuffled between TaskManagers) - Another task is already requesting to consume data from this task but cannot find it (after multiple retries) and it fails the complete job (your stack trace) That happens only occasionally probably due to load in your cluster. The slow down could have multiple reasons... - Is your Hadoop cluster resource constrained and the tasks are slow to deploy? - Is your application JAR very large and needs a lot of time downloading? We have two options at this point: 1) You can increase the maximum retries via the config option: "taskmanager.network.request-backoff.max" The default is 10000 (milliseconds) and specifies what the maximum request back off is [1]. Increasing this to 30000 would give you two extra retries with pretty long delays (see [1]). 2) To be sure that this is really what is happening we could increase the log level of certain classes and check whether they have registered their results or not. If you want to do this, I'm more than happy to provide you with some classes to enable DEBUG logging for. What do you think? – Ufuk DETAILS ======= - The TaskManagers produce and consume intermediate results - When a TaskManager wants to consume a result, it directly queries the producing TaskManager for it - An intermediate result becomes ready for consumption during initial task setup (state DEPLOYING) - When a TaskManager is slow to register its intermediate result and the consumer requests the result before it is ready, it can happen that a requested partition is "not found" This is what is also happening here. We retry to request the intermediate result multiple times with timed backoff [1] and only fail the request (your stack trace) if the partition is still not ready although we expect it to be ready (that is there was no failure at the producing task). [1] Starting by default at 100 millis and going up to 10_000 millis by doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000) On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <[hidden email]> wrote: > Hi, > > I'm having some trouble running a java based Flink job in a yarn-session. > > The job itself consists of reading a set of files resulting in a DataStream > (I use DataStream because in the future I intend to change the file with a > Kafka feed), then does some parsing and eventually writes the data into > HBase. > > Most of the time running this works fine yet sometimes it fails with this > exception: > > org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: > Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363 > not found. > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345) > at > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286) > at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123) > at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118) > at > org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) > at akka.dispatch.OnComplete.internal(Future.scala:248) > at akka.dispatch.OnComplete.internal(Future.scala:245) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > I went through all logs at the Hadoop side of all the related containers and > other than this exception I did not see any warning/error that might explain > what is going on here. > > Now the "Most of the time running this works fine" makes this hard to > troubleshoot. When I run the same job again it may run perfectly that time. > > I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my > pom.xml and I use the same version for Flink / Scala in there. > > The command used to start the yarn-session on my experimental cluster (no > security, no other users): > > /usr/local/flink-1.3.2/bin/yarn-session.sh \ > --container 180 \ > --name "Flink on Yarn Experiments" \ > --slots 1 \ > --jobManagerMemory 4000 \ > --taskManagerMemory 4000 \ > --streaming \ > --detached > > Two relevant fragments from my application pom.xml: > > <flink.version>1.3.2</flink.version> > <flink.scala.version>2.11</flink.scala.version> > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_${flink.scala.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_${flink.scala.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-hbase_${flink.scala.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > I could really use some suggestions where to look for the root cause of > this. > Is this something in my application? My Hadoop cluster? Or is this a problem > in Flink 1.3.2? > > Thanks. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Hey Niels,
any update on this? – Ufuk On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <[hidden email]> wrote: > Hey Niels, > > thanks for the detailed report. I don't think that it is related to > the Hadoop or Scala version. I think the following happens: > > - Occasionally, one of your tasks seems to be extremely slow in > registering its produced intermediate result (the data shuffled > between TaskManagers) > - Another task is already requesting to consume data from this task > but cannot find it (after multiple retries) and it fails the complete > job (your stack trace) > > That happens only occasionally probably due to load in your cluster. > The slow down could have multiple reasons... > - Is your Hadoop cluster resource constrained and the tasks are slow to deploy? > - Is your application JAR very large and needs a lot of time downloading? > > We have two options at this point: > 1) You can increase the maximum retries via the config option: > "taskmanager.network.request-backoff.max" The default is 10000 > (milliseconds) and specifies what the maximum request back off is [1]. > Increasing this to 30000 would give you two extra retries with pretty > long delays (see [1]). > > 2) To be sure that this is really what is happening we could increase > the log level of certain classes and check whether they have > registered their results or not. If you want to do this, I'm more than > happy to provide you with some classes to enable DEBUG logging for. > > What do you think? > > – Ufuk > > DETAILS > ======= > > - The TaskManagers produce and consume intermediate results > - When a TaskManager wants to consume a result, it directly queries > the producing TaskManager for it > - An intermediate result becomes ready for consumption during initial > task setup (state DEPLOYING) > - When a TaskManager is slow to register its intermediate result and > the consumer requests the result before it is ready, it can happen > that a requested partition is "not found" > > This is what is also happening here. We retry to request the > intermediate result multiple times with timed backoff [1] and only > fail the request (your stack trace) if the partition is still not > ready although we expect it to be ready (that is there was no failure > at the producing task). > > [1] Starting by default at 100 millis and going up to 10_000 millis by > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000) > > > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <[hidden email]> wrote: >> Hi, >> >> I'm having some trouble running a java based Flink job in a yarn-session. >> >> The job itself consists of reading a set of files resulting in a DataStream >> (I use DataStream because in the future I intend to change the file with a >> Kafka feed), then does some parsing and eventually writes the data into >> HBase. >> >> Most of the time running this works fine yet sometimes it fails with this >> exception: >> >> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: >> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363 >> not found. >> at >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203) >> at >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128) >> at >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345) >> at >> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286) >> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123) >> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118) >> at >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) >> at akka.dispatch.OnComplete.internal(Future.scala:248) >> at akka.dispatch.OnComplete.internal(Future.scala:245) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >> at >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> I went through all logs at the Hadoop side of all the related containers and >> other than this exception I did not see any warning/error that might explain >> what is going on here. >> >> Now the "Most of the time running this works fine" makes this hard to >> troubleshoot. When I run the same job again it may run perfectly that time. >> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my >> pom.xml and I use the same version for Flink / Scala in there. >> >> The command used to start the yarn-session on my experimental cluster (no >> security, no other users): >> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \ >> --container 180 \ >> --name "Flink on Yarn Experiments" \ >> --slots 1 \ >> --jobManagerMemory 4000 \ >> --taskManagerMemory 4000 \ >> --streaming \ >> --detached >> >> Two relevant fragments from my application pom.xml: >> >> <flink.version>1.3.2</flink.version> >> <flink.scala.version>2.11</flink.scala.version> >> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-java_${flink.scala.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_${flink.scala.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-hbase_${flink.scala.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> I could really use some suggestions where to look for the root cause of >> this. >> Is this something in my application? My Hadoop cluster? Or is this a problem >> in Flink 1.3.2? >> >> Thanks. >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes |
Hi, I'm currently doing some tests to see it this info helps. I was running a different high CPU task on one of the nodes outside Yarn, so I took that one out of the cluster to see if that helps. What I do find strange that in this kind of error scenario the entire job fails. I would have expected something similar as with 'good old' MapReduce: The missing task is simply resubmitted and ran again. Why doesn't that happen? Niels On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <[hidden email]> wrote: Hey Niels, Best regards / Met vriendelijke groeten,
Niels Basjes |
Hey Niels,
Flink currently restarts the complete job if you have a restart strategy configured: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_strategies.html. I agree that only restarting the required parts of the pipeline is an important optimization. Flink has not implemented this (fully) yet but it's on the agenda [1] and work has already started [2]. In this particular case, everything is just slow and we don't need the restart at all if you give the consumer a higher max timeout. Please report back when you have more info :-) – Ufuk [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures [2] https://issues.apache.org/jira/browse/FLINK-4256 On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes <[hidden email]> wrote: > Hi, > > I'm currently doing some tests to see it this info helps. > I was running a different high CPU task on one of the nodes outside Yarn, so > I took that one out of the cluster to see if that helps. > > What I do find strange that in this kind of error scenario the entire job > fails. > I would have expected something similar as with 'good old' MapReduce: The > missing task is simply resubmitted and ran again. > Why doesn't that happen? > > > Niels > > On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <[hidden email]> wrote: >> >> Hey Niels, >> >> any update on this? >> >> – Ufuk >> >> >> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <[hidden email]> wrote: >> > Hey Niels, >> > >> > thanks for the detailed report. I don't think that it is related to >> > the Hadoop or Scala version. I think the following happens: >> > >> > - Occasionally, one of your tasks seems to be extremely slow in >> > registering its produced intermediate result (the data shuffled >> > between TaskManagers) >> > - Another task is already requesting to consume data from this task >> > but cannot find it (after multiple retries) and it fails the complete >> > job (your stack trace) >> > >> > That happens only occasionally probably due to load in your cluster. >> > The slow down could have multiple reasons... >> > - Is your Hadoop cluster resource constrained and the tasks are slow to >> > deploy? >> > - Is your application JAR very large and needs a lot of time >> > downloading? >> > >> > We have two options at this point: >> > 1) You can increase the maximum retries via the config option: >> > "taskmanager.network.request-backoff.max" The default is 10000 >> > (milliseconds) and specifies what the maximum request back off is [1]. >> > Increasing this to 30000 would give you two extra retries with pretty >> > long delays (see [1]). >> > >> > 2) To be sure that this is really what is happening we could increase >> > the log level of certain classes and check whether they have >> > registered their results or not. If you want to do this, I'm more than >> > happy to provide you with some classes to enable DEBUG logging for. >> > >> > What do you think? >> > >> > – Ufuk >> > >> > DETAILS >> > ======= >> > >> > - The TaskManagers produce and consume intermediate results >> > - When a TaskManager wants to consume a result, it directly queries >> > the producing TaskManager for it >> > - An intermediate result becomes ready for consumption during initial >> > task setup (state DEPLOYING) >> > - When a TaskManager is slow to register its intermediate result and >> > the consumer requests the result before it is ready, it can happen >> > that a requested partition is "not found" >> > >> > This is what is also happening here. We retry to request the >> > intermediate result multiple times with timed backoff [1] and only >> > fail the request (your stack trace) if the partition is still not >> > ready although we expect it to be ready (that is there was no failure >> > at the producing task). >> > >> > [1] Starting by default at 100 millis and going up to 10_000 millis by >> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000) >> > >> > >> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <[hidden email]> wrote: >> >> Hi, >> >> >> >> I'm having some trouble running a java based Flink job in a >> >> yarn-session. >> >> >> >> The job itself consists of reading a set of files resulting in a >> >> DataStream >> >> (I use DataStream because in the future I intend to change the file >> >> with a >> >> Kafka feed), then does some parsing and eventually writes the data into >> >> HBase. >> >> >> >> Most of the time running this works fine yet sometimes it fails with >> >> this >> >> exception: >> >> >> >> >> >> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: >> >> Partition >> >> 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363 >> >> not found. >> >> at >> >> >> >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203) >> >> at >> >> >> >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128) >> >> at >> >> >> >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345) >> >> at >> >> >> >> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286) >> >> at >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123) >> >> at >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118) >> >> at >> >> >> >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) >> >> at akka.dispatch.OnComplete.internal(Future.scala:248) >> >> at akka.dispatch.OnComplete.internal(Future.scala:245) >> >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) >> >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) >> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >> >> at >> >> >> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >> >> at >> >> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >> >> at >> >> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> >> at >> >> >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> >> at >> >> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >> >> at >> >> >> >> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> >> at >> >> >> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> >> at >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at >> >> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> at >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> at >> >> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> I went through all logs at the Hadoop side of all the related >> >> containers and >> >> other than this exception I did not see any warning/error that might >> >> explain >> >> what is going on here. >> >> >> >> Now the "Most of the time running this works fine" makes this hard to >> >> troubleshoot. When I run the same job again it may run perfectly that >> >> time. >> >> >> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked >> >> my >> >> pom.xml and I use the same version for Flink / Scala in there. >> >> >> >> The command used to start the yarn-session on my experimental cluster >> >> (no >> >> security, no other users): >> >> >> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \ >> >> --container 180 \ >> >> --name "Flink on Yarn Experiments" \ >> >> --slots 1 \ >> >> --jobManagerMemory 4000 \ >> >> --taskManagerMemory 4000 \ >> >> --streaming \ >> >> --detached >> >> >> >> Two relevant fragments from my application pom.xml: >> >> >> >> <flink.version>1.3.2</flink.version> >> >> <flink.scala.version>2.11</flink.scala.version> >> >> >> >> >> >> >> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-java</artifactId> >> >> <version>${flink.version}</version> >> >> </dependency> >> >> >> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-streaming-java_${flink.scala.version}</artifactId> >> >> <version>${flink.version}</version> >> >> </dependency> >> >> >> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-clients_${flink.scala.version}</artifactId> >> >> <version>${flink.version}</version> >> >> </dependency> >> >> >> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-hbase_${flink.scala.version}</artifactId> >> >> <version>${flink.version}</version> >> >> </dependency> >> >> >> >> >> >> I could really use some suggestions where to look for the root cause of >> >> this. >> >> Is this something in my application? My Hadoop cluster? Or is this a >> >> problem >> >> in Flink 1.3.2? >> >> >> >> Thanks. >> >> >> >> -- >> >> Best regards / Met vriendelijke groeten, >> >> >> >> Niels Basjes > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Hi I did some tests and it turns out I was really overloading the cluster which caused the problems. I tried the timeout setting but that didn't help. Simply 'not overloading' the system did help. Thanks. Niels On Thu, Oct 12, 2017 at 10:42 AM, Ufuk Celebi <[hidden email]> wrote: Hey Niels, Best regards / Met vriendelijke groeten,
Niels Basjes |
Free forum by Nabble | Edit this page |