Data Loss in HDFS after Job failure

Posted by Dominique Rondé-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Data-Loss-in-HDFS-after-Job-failure-tp10122.html

Hi @all!

I figured out a strange behavior with the Rolling HDFS-Sink. We consume
events from a kafka topic and write them into a HDFS Filesystem. We use
the RollingSink-Implementation in this way:

    RollingSink<String> sink = new
RollingSink<String>("/some/hdfs/directory") //
            .setBucketer(new DateTimeBucketer(YYYY_MM_DD)) //
            .disableCleanupOnOpen() //
            .setBatchSize(10485760L) //
            .setPartPrefix("part") //
            .setPendingPrefix("") //
            .setPendingSuffix("");

The last days we had some network trouble causing one or more
TaskManager out of service for some time. Due to that reason, some flink
jobs are canceled because there were not enought slots available. After
the TaskManager come back, the jobs were restarted. After that, all (!!)
HDFS-Directories are absolute clean. This means that no data file is
left under the root directory /some/hdfs/directory matching our path and
file name pattern. The stacktrace below is generated and shows, that the
job tries to recover from the last state and expect a data file existing.

java.lang.Exception: Could not restore checkpointed state to operators
and functions
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Failed to restore state to function:
Error while restoring RollingSink state.
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:544)
    ... 3 more
Caused by: java.lang.RuntimeException: Error while restoring RollingSink
state.
    at
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:680)
    at
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:123)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
    ... 4 more
Caused by: java.io.FileNotFoundException: File does not exist:
/some/hdfs/directory/2016-11-07/part-0-0
    at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877)
    at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671)
    at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)

    at sun.reflect.GeneratedConstructorAccessor133.newInstance(Unknown
Source)
    at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1247)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:279)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:275)
    at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.recoverLease(DistributedFileSystem.java:291)
    at
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:625)
    ... 6 more
Caused by:
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
File does not exist: /user/flink/kraft/kraft-vorschlag/2016-11-07/part-0-0
    at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877)
    at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671)
    at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy13.recoverLease(Unknown Source)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.recoverLease(ClientNamenodeProtocolTranslatorPB.java:603)
    at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy14.recoverLease(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1245)
    ... 11 more

Has anyone out there similar experience or any clue how to stop
flink/yarn/hdfs doing this?

Greets

Dominique


0x962E5CF3.asc (3K) Download Attachment
signature.asc (836 bytes) Download Attachment