BucketingSink - Could not invoke truncate while recovering from state

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

BucketingSink - Could not invoke truncate while recovering from state

sohimankotia
Hi,

I have streaming job ( Flink 1.5.5 ) which was running on Hadoop hortonworks
cluster with 150 parallelism (25 * 6). It has checkpoint enabled .

One of the host in Hadoop went down which caused job to restart container
and restart from last checkpoint.

But it could not restart due to following exception . It went to loop to
recover from state .


*
org.apache.flink.streaming.connectors.fs.BucketingSink  - Could not invoke
truncate*
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.streaming.connectors.fs.BucketingSink.handlePendingInProgressFile(BucketingSink.java)
        at
org.apache.flink.streaming.connectors.fs.BucketingSink.handleRestoredBucketState(BucketingSink.java)
        at
org.apache.flink.streaming.connectors.fs.BucketingSink.initializeState(BucketingSink.java)
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:277)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
        at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to TRUNCATE_FILE /pipeline/prod/20190219/15/part-1550570070550-57-1
for DFSClient_NONMAPREDUCE_-706922361_126 on <host> because
DFSClient_NONMAPREDUCE_-706922361_126 is already the current lease holder.
        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3109)
        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2256)
        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2202)
        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2172)
        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056)
        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:610)
        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)

        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
        at org.apache.hadoop.ipc.Client.call(Client.java:1498)
        at org.apache.hadoop.ipc.Client.call(Client.java:1398)
        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
        at com.sun.proxy.$Proxy17.truncate(Unknown Source)
        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:330)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
        at com.sun.proxy.$Proxy18.truncate(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:2073)
        at
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:797)


Any immediate solution will be helpful .

Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink - Could not invoke truncate while recovering from state

sohimankotia
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink - Could not invoke truncate while recovering from state

Gyula Fóra
Hi all!

I am gonna try to resurrect this thread as I think I have hit the same issue with the StreamingFileSink: https://issues.apache.org/jira/browse/FLINK-13874

I don't have a good answer but it seems that we try to truncate before we get the lease (even though there is logic both in BucketingSink and HadoopRecoverable... to wait before calling truncate).

Does anyone have any idea?

Cheers,
Gyula

On Sun, Feb 24, 2019 at 4:13 AM sohimankotia <[hidden email]> wrote:
Any help ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink - Could not invoke truncate while recovering from state

Kostas Kloudas-2
Hi Guyla,

Thanks for looking into it.
I did not dig into it but in the trace you posted there is the line:

Failed to TRUNCATE_FILE ... for
**DFSClient_NONMAPREDUCE_-1189574442_56** on 172.31.114.177 because
**DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease
holder**.

The client seems to be the same so can this be that we are trying to
get the lease twice from the same task and the lease is not
"reentrant"?

Cheers,
Kostas

On Tue, Aug 27, 2019 at 4:53 PM Gyula Fóra <[hidden email]> wrote:

>
> Hi all!
>
> I am gonna try to resurrect this thread as I think I have hit the same issue with the StreamingFileSink: https://issues.apache.org/jira/browse/FLINK-13874
>
> I don't have a good answer but it seems that we try to truncate before we get the lease (even though there is logic both in BucketingSink and HadoopRecoverable... to wait before calling truncate).
>
> Does anyone have any idea?
>
> Cheers,
> Gyula
>
> On Sun, Feb 24, 2019 at 4:13 AM sohimankotia <[hidden email]> wrote:
>>
>> Any help ?
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink - Could not invoke truncate while recovering from state

Gyula Fóra
Thats a nice observation! :)  I haven't caught that. We need to check that for sure.

Gyula

On Tue, Aug 27, 2019 at 5:00 PM Kostas Kloudas <[hidden email]> wrote:
Hi Guyla,

Thanks for looking into it.
I did not dig into it but in the trace you posted there is the line:

Failed to TRUNCATE_FILE ... for
**DFSClient_NONMAPREDUCE_-1189574442_56** on 172.31.114.177 because
**DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease
holder**.

The client seems to be the same so can this be that we are trying to
get the lease twice from the same task and the lease is not
"reentrant"?

Cheers,
Kostas

On Tue, Aug 27, 2019 at 4:53 PM Gyula Fóra <[hidden email]> wrote:
>
> Hi all!
>
> I am gonna try to resurrect this thread as I think I have hit the same issue with the StreamingFileSink: https://issues.apache.org/jira/browse/FLINK-13874
>
> I don't have a good answer but it seems that we try to truncate before we get the lease (even though there is logic both in BucketingSink and HadoopRecoverable... to wait before calling truncate).
>
> Does anyone have any idea?
>
> Cheers,
> Gyula
>
> On Sun, Feb 24, 2019 at 4:13 AM sohimankotia <[hidden email]> wrote:
>>
>> Any help ?
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/