Hello,
Every once in a while our checkpoints fail with the following exception: /AsynchronousException{java.lang.Exception: Could not materialize checkpoint 65912 for operator AGGREGATION-FILTER (2/2).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for operator AGGREGATION-FILTER (2/2). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc (inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) at org.apache.hadoop.ipc.Client.call(Client.java:1435) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy12.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484) at sun.reflect.GeneratedMethodAccessor143.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) at com.sun.proxy.$Proxy13.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314) ... 12 more/ What could be the problem? Flink version: 1.6.2 Checkpointing configuration: Screenshot_2019-05-15_at_13.png <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-05-15_at_13.png> Average state size: ~56 MB ----- Best Regards, Pedro Chaves -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves |
Hi, could you also post job master logs? and ideally full task manager logs. This failure can be caused by some other previous failure. Best, Andrey On Wed, May 15, 2019 at 2:48 PM PedroMrChaves <[hidden email]> wrote: Hello, |
Hi Pedro
Could you please share the audit log for file `/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc`, seems this did not exist cause this problem (maybe this file was created and deleted for some reason) Best, Congxian Andrey Zagrebin <[hidden email]> 于2019年5月16日周四 下午3:47写道:
|
In reply to this post by Andrey Zagrebin-3
Hello,
Thanks for the help. I've attached the logs. Our cluster has 2 job managers (HA) and 4 task managers. logs.tgz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/logs.tgz> Regards, Pedro ----- Best Regards, Pedro Chaves -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves |
In reply to this post by Congxian Qiu
Hello Andrey,
The audit log doesn't have anything that would point to it being deleted. The only thing worth mentioning is the following line. /2019-05-15 10:01:39,082 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: BLOCK* blk_1248714854_174974084 is COMMITTED but not COMPLETE(numNodes= 0 < minimum = 1) in file /flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc/ Regards, Pedro Chaves ----- Best Regards, Pedro Chaves -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves |
In reply to this post by PedroMrChaves
The issue happened again.
/AsynchronousException{java.lang.Exception: Could not materialize checkpoint 47400 for operator ENRICH (1/4).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 47400 for operator ENRICH (1/4). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150 (inode 188591918) Holder DFSClient_NONMAPREDUCE_-2031154839_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) at org.apache.hadoop.ipc.Client.call(Client.java:1435) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy12.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484) at sun.reflect.GeneratedMethodAccessor191.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) at com.sun.proxy.$Proxy13.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312) ... 12 more/ Any Ideas? ----- Best Regards, Pedro Chaves -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves |
Hi Pedro
From the previous given log, I found that checkpoint 65912 has been expired then, the raise the IOException.
When some checkpoint expired, the checkpoint dir will be deleted(CheckpointCoordinator#549 on release-1.6 branch), and the unfinished task will still write to the previous files, so this IOException will encounter. Could you please also check the audit log for the checkpoint dir(if the directory did not been deleted, also checkpoint parent directory of the checkpoint directory).
Best Congxian On May 22, 2019, 04:16 +0800, PedroMrChaves <[hidden email]>, wrote:
The issue happened again. |
Unfortunately the audit logs for hdfs were not enabled. We will enable them
and post he results when the problem happens again. Nonetheless, we don't have ay other process using hadoop besides flink ----- Best Regards, Pedro Chaves -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves |
Free forum by Nabble | Edit this page |