This post was updated on .
Hi ,
Flink - 1.5.5 My Streaming job has checkpoint every minute . I am getting following exception. 2019-01-15 01:59:04,680 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 469 for job e9a08c0661a6c31b5af540cf352e1265 (2736 bytes in 124 ms). 2019-01-15 02:00:04,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 470 @ 1547497804679 for job e9a08c0661a6c31b5af540cf352e1265. 2019-01-15 02:00:04,754 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 471 @ 1547497804753 for job e9a08c0661a6c31b5af540cf352e1265. 2019-01-15 02:00:19,072 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 471 for job e9a08c0661a6c31b5af540cf352e1265 (18372 bytes in 14296 ms). 2019-01-15 02:00:19,984 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Split Reader: avro-file-watcher-source-group -> avro-file-watcher-source-group-event-mapper (1/6) (bd1375f88c81cfd7a9b5a432d4f73fe4) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 470 for operator Split Reader: avro-file-watcher-source-group -> avro-file-watcher-source-group-event-mapper (1/6).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 470 for operator Split Reader: avro-file-watcher-source-group -> avro-file-watcher-source-group-event-mapper (1/6). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854) ... 5 more Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:325) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:447) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19 (inode 542384858): File does not exist. Holder DFSClient_NONMAPREDUCE_1564502713_104 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3660) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3750) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3717) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:912) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:547) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) at org.apache.hadoop.ipc.Client.call(Client.java:1498) at org.apache.hadoop.ipc.Client.call(Client.java:1398) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at com.sun.proxy.$Proxy18.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:503) at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185) at com.sun.proxy.$Proxy19.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2489) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2466) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2431) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:311) ... 12 more 2019-01-15 02:00:19,994 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CS-[group] [ Mon Jan 14 22:06:07 IST 2019 ] (e9a08c0661a6c31b5af540cf352e1265) switched from state RUNNING to FAILING. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 470 for operator Split Reader: avro-file-watcher-source-group -> avro-file-watcher-source-group-event-mapper (1/6).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 470 for operator Split Reader: avro-file-watcher-source-group -> avro-file-watcher-source-group-event-mapper (1/6). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854) ... 5 more Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:325) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:447) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19 (inode 542384858): File does not exist. Holder DFSClient_NONMAPREDUCE_1564502713_104 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3660) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3750) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3717) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:912) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:547) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) at org.apache.hadoop.ipc.Client.call(Client.java:1498) at org.apache.hadoop.ipc.Client.call(Client.java:1398) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at com.sun.proxy.$Proxy18.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:503) at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185) at com.sun.proxy.$Proxy19.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2489) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2466) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2431) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:311) ... 12 more Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Sohi Seems like the checkpoint file `hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19` did not exist for some reason, you can check the life cycle of this file from hdfs audit log and find out why the file did not exist. maybe the checkpoint directory has been removed because the checkpoint 470 failed[1][2]. sohimankotia <[hidden email]> 于2019年1月15日周二 下午2:57写道: Hi , Best, Congxian |
Yes. File got deleted .
2019-01-15 10:40:41,360 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/192.168.3.184 cmd=delete src=/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19 dst=null perm=null proto=rpc Looks like file was deleted from job itself . Does it cause job restart then ? If checkpoint fails then it should try next checkpoint or restart job ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Sohi You can check out doc[1][2] to find out the answer. sohimankotia <[hidden email]> 于2019年1月15日周二 下午4:16写道: Yes. File got deleted . Best, Congxian |
Hi Sohimankotia, you can control Flink's failure behaviour in case of a checkpoint failure via the `ExecutionConfig#setFailTaskOnCheckpointError(boolean)`. Per default it is set to true which means that a Flink task will fail if a checkpoint error occurs. If you set it to false, then the job won't fail if a checkpoint fails. Cheers, Till On Wed, Jan 16, 2019 at 3:20 AM Congxian Qiu <[hidden email]> wrote:
|
In reply to this post by Congxian Qiu
Hi Sohi, Could it be that you configured your job tasks to fail if checkpoint fails (streamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(true))? Could you send the complete job master log? If checkpoint 470 has been subsumed by 471, it could be that its directory is removed to release resources, but some tasks are still running checkpointing and fail being unable to access removed files. It could be ignored if the checkpoint was just subsumed by the next successful one but setFailOnCheckpointingErrors(true) cases the job to fail. Best, Andrey On Wed, Jan 16, 2019 at 3:20 AM Congxian Qiu <[hidden email]> wrote:
|
Hi Andrey,
Yes .Setting setFailOnCheckpointingErrors(false) solved the problem. But in between I am getting this error : 2019-01-16 21:07:26,979 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler - Implementation error: Unhandled exception. org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e187_1544779926156_82698_01_000002. at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:566) at sun.reflect.GeneratedMethodAccessor114.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2019-01-16 21:07:26,980 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - score-group -> Sink: kafka-sink-group (5/6) (be15ef0054701e9d94450a3704a0094c) switched from DEPLOYING to RUNNING. 2019-01-16 21:07:27,197 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Split Reader: avro-file-watcher-source-group -> avro-file-watcher-source-group-event-mapper (4/6) (513da1cadd7be96fa49c6f5ef8d3dc4f) switched from DEPLOYING to RUNNING. 2019-01-16 21:07:30,684 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler - Implementation error: Unhandled exception. org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e187_1544779926156_82698_01_000002. at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:566) at sun.reflect.GeneratedMethodAccessor114.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |