Hey, Recently I used flink1.6.3 to run a task and the following exception occurred. Can someone tell me how to solve this problem? AsynchronousException{java.lang.Exception: Could not materialize checkpoint 10631 for operator Source: Custom Source -> Flat Map -> (Map -> Timestamps/Watermarks -> from: (dtEventTime, bkdata_par_offset, N_Route, iEventId, vVersion, vFromIp, vLocalIp, iRequestType, vAppid, vOpenid, vCmdid, vL5_ip_port, iPermission, iResult, dProcessTime, vErrorMsg, vUrl, vDeviceInfo, rowtime) -> where: (OR(=(iResult, -308), =(iResult, -309))), select: (rowtime, vErrorMsg) -> time attribute: (rowtime), Map) (8/20).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 10631 for operator Source: Custom Source -> Flat Map -> (Map -> Timestamps/Watermarks -> from: (dtEventTime, bkdata_par_offset, N_Route, iEventId, vVersion, vFromIp, vLocalIp, iRequestType, vAppid, vOpenid, vCmdid, vL5_ip_port, iPermission, iResult, dProcessTime, vErrorMsg, vUrl, vDeviceInfo, rowtime) -> where: (OR(=(iResult, -308), =(iResult, -309))), select: (rowtime, vErrorMsg) -> time attribute: (rowtime), Map) (8/20). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc (inode 7450914863): File does not exist. Holder DFSClient_NONMAPREDUCE_-1995220833_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3690) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3660) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:729) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:243) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:527) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy9.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:443) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy10.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2283) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2267) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314) ... 12 more |
Hi, Leor Seems like the checkpoint file `/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc` did not exist for some reason, you can check the life cycle of this file from hdfs audit log and find out why the file did not exist. maybe the checkpoint directory has been removed because the checkpoint 10631 has been subsumed[1][2]. Leor Li <[hidden email]> 于2019年1月13日周日 下午6:59写道:
Best, Congxian |
Free forum by Nabble | Edit this page |