Re: Flink job on secure Yarn fails after many hours

Posted by Maximilian Michels on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-job-on-secure-Yarn-fails-after-many-hours-tp3856p3861.html

Hi Niels,

Sorry for hear you experienced this exception. From a first glance, it
looks like a bug in Hadoop to me.

> "Not retrying because the invoked method is not idempotent, and unable to determine whether it was invoked"

That is nothing to worry about. This is Hadoop's internal retry
mechanism that re-attempts to do actions which previously failed if
that's possible. Since the action is not idempotent (it cannot be
executed again without risking to change the state of the execution)
and it also doesn't track its execution states, it won't be retried
again.

The main issue is this exception:

>org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from >appattempt_1443166961758_163901_000001

From the stack trace it is clear that this exception occurs upon
requesting container status information from the Resource Manager:

>at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)

Are there any more exceptions in the log? Do you have the complete
logs available and could you share them?


Best regards,
Max

On Wed, Dec 2, 2015 at 11:47 AM, Niels Basjes <[hidden email]> wrote:

> Hi,
>
>
> We have a Kerberos secured Yarn cluster here and I'm experimenting with
> Apache Flink on top of that.
>
> A few days ago I started a very simple Flink application (just stream the
> time as a String into HBase 10 times per second).
>
> I (deliberately) asked our IT-ops guys to make my account have a max ticket
> time of 5 minutes and a max renew time of 10 minutes (yes, ridiculously low
> timeout values because I needed to validate this
> https://issues.apache.org/jira/browse/FLINK-2977).
>
> This job is started with a keytab file and after running for 31 hours it
> suddenly failed with the exception you see below.
>
> I had the same job running for almost 400 hours until that failed too (I was
> too late to check the logfiles but I suspect the same problem).
>
>
> So in that time span my tickets have expired and new tickets have been
> obtained several hundred times.
>
>
> The main error I see is that in the process of a ticket expiring and being
> renewed I see this message:
>
>      Not retrying because the invoked method is not idempotent, and unable
> to determine whether it was invoked
>
>
> Yarn on the cluster is 2.6.0 ( HDP 2.6.0.2.2.4.2-2 )
>
> Flink is version 0.10.1
>
>
> How do I fix this?
> Is this a bug (in either Hadoop or Flink) or am I doing something wrong?
> Would upgrading Yarn to 2.7.1  (i.e. HDP 2.3) fix this?
>
>
> Niels Basjes
>
>
>
> 21:30:27,821 WARN  org.apache.hadoop.security.UserGroupInformation
> - PriviledgedActionException as:nbasjes (auth:SIMPLE)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_000001
> 21:30:27,861 WARN  org.apache.hadoop.ipc.Client
> - Exception encountered while connecting to the server :
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_000001
> 21:30:27,861 WARN  org.apache.hadoop.security.UserGroupInformation
> - PriviledgedActionException as:nbasjes (auth:SIMPLE)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_000001
> 21:30:27,891 WARN  org.apache.hadoop.io.retry.RetryInvocationHandler
> - Exception while invoking class
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate.
> Not retrying because the invoked method is not idempotent, and unable to
> determine whether it was invoked
> org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> AMRMToken from appattempt_1443166961758_163901_000001
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
> at
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy14.allocate(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:245)
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_000001
> at org.apache.hadoop.ipc.Client.call(Client.java:1406)
> at org.apache.hadoop.ipc.Client.call(Client.java:1359)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy13.allocate(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
> ... 29 more
> 21:30:27,943 ERROR akka.actor.OneForOneStrategy
> - Invalid AMRMToken from appattempt_1443166961758_163901_000001
> org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> AMRMToken from appattempt_1443166961758_163901_000001
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
> at
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy14.allocate(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:245)
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:259)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1443166961758_163901_000001
> at org.apache.hadoop.ipc.Client.call(Client.java:1406)
> at org.apache.hadoop.ipc.Client.call(Client.java:1359)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy13.allocate(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
> ... 29 more
> 21:30:28,075 INFO  org.apache.flink.yarn.YarnJobManager
> - Stopping JobManager akka.tcp://flink@10.10.200.3:39527/user/jobmanager.
> 21:30:28,088 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: Custom Source -> Sink: Unnamed (1/1)
> (db0d95c11c14505827e696eec7efab94) switched from RUNNING to CANCELING
> 21:30:28,113 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: Custom Source -> Sink: Unnamed (1/1)
> (db0d95c11c14505827e696eec7efab94) switched from CANCELING to FAILED
> 21:30:28,184 INFO  org.apache.flink.runtime.blob.BlobServer
> - Stopped BLOB server at 0.0.0.0:41281
> 21:30:28,185 ERROR org.apache.flink.runtime.jobmanager.JobManager
> - Actor akka://flink/user/jobmanager#403236912 terminated, stopping
> process...
> 21:30:28,286 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
> - Removing web root dir /tmp/flink-web-e1a44f94-ea6d-40ee-b87c-e3122d5cb9bd
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes