Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Clifford Resnick

Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4

 

The error I’m getting is :

 

11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while materializing asynchronous checkpoints.

com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1 (Is a directory)

                at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)

                at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)

                at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)

                at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)

                at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)

                at java.util.concurrent.FutureTask.run(FutureTask.java:266)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

In the debugger I noticed that some of the uploaded checkpoints are from the configured /tmp location. These succeed as file in the request is fully qualified, but I guess it’s different for WindowOperators? Here the file in the request (using a different /var/folders.. location not configured by me – must be a mac thing?) is actually a directory. The AWS api is failing when it tries to calculate an MD5 of the directory. The Flink side of the codepath is hard to discern from debugging because it’s asynchronous.

 

I get the same issue whether local or on a CentOs- based YARN cluster. Everything works if I use HDFS instead. Any insight will be greatly appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5 verification skipped.

 

-Cliff

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Ufuk Celebi
Hey Cliff!

I was able to reproduce this by locally running a job and RocksDB semi
asynchronous checkpoints (current default) to S3A. I've created an
issue here: https://issues.apache.org/jira/browse/FLINK-4228.

Running with S3N it is working as expected. You can use that
implementation as a work around. I don't know whether it's possible to
disable creation of MD5 hashes for S3A.

– Ufuk

On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
<[hidden email]> wrote:

> Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
>
>
>
> The error I’m getting is :
>
>
>
> 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
> - Caught exception while materializing asynchronous checkpoints.
>
> com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
> /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
> (Is a directory)
>
>                 at
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
>
>                 at
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
>
>                 at
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
>
>                 at
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
>
>                 at
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
>
>                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>                 at java.lang.Thread.run(Thread.java:745)
>
>
>
> In the debugger I noticed that some of the uploaded checkpoints are from the
> configured /tmp location. These succeed as file in the request is fully
> qualified, but I guess it’s different for WindowOperators? Here the file in
> the request (using a different /var/folders.. location not configured by me
> – must be a mac thing?) is actually a directory. The AWS api is failing when
> it tries to calculate an MD5 of the directory. The Flink side of the
> codepath is hard to discern from debugging because it’s asynchronous.
>
>
>
> I get the same issue whether local or on a CentOs- based YARN cluster.
> Everything works if I use HDFS instead. Any insight will be greatly
> appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5
> verification skipped.
>
>
>
> -Cliff
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Clifford Resnick
In reply to this post by Clifford Resnick

The root cause of this problem seems to be that Flink is copying directories with the FileSystem. Unfortunately, unlike the default HDFS implementation, org.apache.ahadoop.fs.s3a.S3AFileSystem does not implement a recursive copyFromLocalFile and Flink 1.0.3 fails when is tries to copy a Window Operator savepoint directory. Flink 1.1 is worse as it cannot even set up the session, because it tries to copy the flink/lib dir on init.

 

I can work around this in 1.0.3 by subclassing S3AFilesSystem and implementing a recursive copyFromLocalFile. Unfortunately, this isn’t good enough for Flink 1.1 since it expects the copied “lib” director to exist in cache to set up the classpath with (I think).

 

I’m really hoping there is something simple that I’m missing here that someone can fill me in on. Anyone else successfully up and working with Flink -> Yarn -> S3A? If so, what version of Hadoop and Flink, and was there anything you did other than configure core-site.xml?

 

-Cliff

 

 

From: Clifford Resnick <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Saturday, July 16, 2016 at 12:26 PM
To: "[hidden email]" <[hidden email]>
Subject: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

 

Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4

 

The error I’m getting is :

 

11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while materializing asynchronous checkpoints.

com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1 (Is a directory)

                at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)

                at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)

                at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)

                at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)

                at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)

                at java.util.concurrent.FutureTask.run(FutureTask.java:266)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

                at java.lang.Thread.run(Thread.java:745)

 

In the debugger I noticed that some of the uploaded checkpoints are from the configured /tmp location. These succeed as file in the request is fully qualified, but I guess it’s different for WindowOperators? Here the file in the request (using a different /var/folders.. location not configured by me – must be a mac thing?) is actually a directory. The AWS api is failing when it tries to calculate an MD5 of the directory. The Flink side of the codepath is hard to discern from debugging because it’s asynchronous.

 

I get the same issue whether local or on a CentOs- based YARN cluster. Everything works if I use HDFS instead. Any insight will be greatly appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5 verification skipped.

 

-Cliff

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Clifford Resnick
In reply to this post by Ufuk Celebi
Hi Ufuk,

My mail was down, so I missed this response. Thanks for that.

On 7/18/16, 10:38 AM, "Ufuk Celebi" <[hidden email]> wrote:

    Hey Cliff!
   
    I was able to reproduce this by locally running a job and RocksDB semi
    asynchronous checkpoints (current default) to S3A. I've created an
    issue here: https://issues.apache.org/jira/browse/FLINK-4228.
   
    Running with S3N it is working as expected. You can use that
    implementation as a work around. I don't know whether it's possible to
    disable creation of MD5 hashes for S3A.
   
    – Ufuk
   
    On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
    <[hidden email]> wrote:
    > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
    >
    >
    >
    > The error I’m getting is :
    >
    >
    >
    > 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
    > - Caught exception while materializing asynchronous checkpoints.
    >
    > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
    > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
    > (Is a directory)
    >
    >                 at
    > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
    >
    >                 at
    > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
    >
    >                 at
    > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
    >
    >                 at
    > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
    >
    >                 at
    > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
    >
    >                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >
    >                 at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    >
    >                 at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    >
    >                 at java.lang.Thread.run(Thread.java:745)
    >
    >
    >
    > In the debugger I noticed that some of the uploaded checkpoints are from the
    > configured /tmp location. These succeed as file in the request is fully
    > qualified, but I guess it’s different for WindowOperators? Here the file in
    > the request (using a different /var/folders.. location not configured by me
    > – must be a mac thing?) is actually a directory. The AWS api is failing when
    > it tries to calculate an MD5 of the directory. The Flink side of the
    > codepath is hard to discern from debugging because it’s asynchronous.
    >
    >
    >
    > I get the same issue whether local or on a CentOs- based YARN cluster.
    > Everything works if I use HDFS instead. Any insight will be greatly
    > appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5
    > verification skipped.
    >
    >
    >
    > -Cliff
    >
    >
    >
    >
    >
    >
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Ufuk Celebi
Hey Cliff! Good to see that we came to the same conclusion :-) What do
you mean with copying of the "lib" folder? This issue should be the
same for both 1.0 and 1.1. Another work around could be to use the
fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.

If you like, you could also work on the issue I've created by
implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
contribute this via a pull request.

– Ufuk


On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
<[hidden email]> wrote:

> Hi Ufuk,
>
> My mail was down, so I missed this response. Thanks for that.
>
> On 7/18/16, 10:38 AM, "Ufuk Celebi" <[hidden email]> wrote:
>
>     Hey Cliff!
>
>     I was able to reproduce this by locally running a job and RocksDB semi
>     asynchronous checkpoints (current default) to S3A. I've created an
>     issue here: https://issues.apache.org/jira/browse/FLINK-4228.
>
>     Running with S3N it is working as expected. You can use that
>     implementation as a work around. I don't know whether it's possible to
>     disable creation of MD5 hashes for S3A.
>
>     – Ufuk
>
>     On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
>     <[hidden email]> wrote:
>     > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
>     >
>     >
>     >
>     > The error I’m getting is :
>     >
>     >
>     >
>     > 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
>     > - Caught exception while materializing asynchronous checkpoints.
>     >
>     > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
>     > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
>     > (Is a directory)
>     >
>     >                 at
>     > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
>     >
>     >                 at
>     > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
>     >
>     >                 at
>     > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
>     >
>     >                 at
>     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
>     >
>     >                 at
>     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
>     >
>     >                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >
>     >                 at
>     > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     >
>     >                 at
>     > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     >
>     >                 at java.lang.Thread.run(Thread.java:745)
>     >
>     >
>     >
>     > In the debugger I noticed that some of the uploaded checkpoints are from the
>     > configured /tmp location. These succeed as file in the request is fully
>     > qualified, but I guess it’s different for WindowOperators? Here the file in
>     > the request (using a different /var/folders.. location not configured by me
>     > – must be a mac thing?) is actually a directory. The AWS api is failing when
>     > it tries to calculate an MD5 of the directory. The Flink side of the
>     > codepath is hard to discern from debugging because it’s asynchronous.
>     >
>     >
>     >
>     > I get the same issue whether local or on a CentOs- based YARN cluster.
>     > Everything works if I use HDFS instead. Any insight will be greatly
>     > appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5
>     > verification skipped.
>     >
>     >
>     >
>     > -Cliff
>     >
>     >
>     >
>     >
>     >
>     >
>     >
>     >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Clifford Resnick
In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local to where the yarn app is launched) to Yarn with a single directory copy. In 1.0.3 it looked like it was copying the individual jars.

So, yes I did actually change HDFSCopyToLocal, which was easy, but the job staging in the above class also needs altering. I’m happy to contribute on both though I won’t be able to get to it until later this week.

-Cliff



On 7/18/16, 3:38 PM, "Ufuk Celebi" <[hidden email]> wrote:

    Hey Cliff! Good to see that we came to the same conclusion :-) What do
    you mean with copying of the "lib" folder? This issue should be the
    same for both 1.0 and 1.1. Another work around could be to use the
    fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
   
    If you like, you could also work on the issue I've created by
    implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
    contribute this via a pull request.
   
    – Ufuk
   
   
    On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
    <[hidden email]> wrote:
    > Hi Ufuk,
    >
    > My mail was down, so I missed this response. Thanks for that.
    >
    > On 7/18/16, 10:38 AM, "Ufuk Celebi" <[hidden email]> wrote:
    >
    >     Hey Cliff!
    >
    >     I was able to reproduce this by locally running a job and RocksDB semi
    >     asynchronous checkpoints (current default) to S3A. I've created an
    >     issue here: https://issues.apache.org/jira/browse/FLINK-4228.
    >
    >     Running with S3N it is working as expected. You can use that
    >     implementation as a work around. I don't know whether it's possible to
    >     disable creation of MD5 hashes for S3A.
    >
    >     – Ufuk
    >
    >     On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
    >     <[hidden email]> wrote:
    >     > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
    >     >
    >     >
    >     >
    >     > The error I’m getting is :
    >     >
    >     >
    >     >
    >     > 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
    >     > - Caught exception while materializing asynchronous checkpoints.
    >     >
    >     > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
    >     > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
    >     > (Is a directory)
    >     >
    >     >                 at
    >     > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
    >     >
    >     >                 at
    >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
    >     >
    >     >                 at
    >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
    >     >
    >     >                 at
    >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
    >     >
    >     >                 at
    >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
    >     >
    >     >                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >     >
    >     >                 at
    >     > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    >     >
    >     >                 at
    >     > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    >     >
    >     >                 at java.lang.Thread.run(Thread.java:745)
    >     >
    >     >
    >     >
    >     > In the debugger I noticed that some of the uploaded checkpoints are from the
    >     > configured /tmp location. These succeed as file in the request is fully
    >     > qualified, but I guess it’s different for WindowOperators? Here the file in
    >     > the request (using a different /var/folders.. location not configured by me
    >     > – must be a mac thing?) is actually a directory. The AWS api is failing when
    >     > it tries to calculate an MD5 of the directory. The Flink side of the
    >     > codepath is hard to discern from debugging because it’s asynchronous.
    >     >
    >     >
    >     >
    >     > I get the same issue whether local or on a CentOs- based YARN cluster.
    >     > Everything works if I use HDFS instead. Any insight will be greatly
    >     > appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5
    >     > verification skipped.
    >     >
    >     >
    >     >
    >     > -Cliff
    >     >
    >     >
    >     >
    >     >
    >     >
    >     >
    >     >
    >     >
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Ufuk Celebi
Feel free to do the contribution at any time you like. We can also
always make it part of a bugfix release if it does not make it into
the upcoming 1.1 RC (probably end of this week or beginning of next).
Feel free to ping me if you need any feed back or pointers.

– Ufuk


On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
<[hidden email]> wrote:

> In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local to where the yarn app is launched) to Yarn with a single directory copy. In 1.0.3 it looked like it was copying the individual jars.
>
> So, yes I did actually change HDFSCopyToLocal, which was easy, but the job staging in the above class also needs altering. I’m happy to contribute on both though I won’t be able to get to it until later this week.
>
> -Cliff
>
>
>
> On 7/18/16, 3:38 PM, "Ufuk Celebi" <[hidden email]> wrote:
>
>     Hey Cliff! Good to see that we came to the same conclusion :-) What do
>     you mean with copying of the "lib" folder? This issue should be the
>     same for both 1.0 and 1.1. Another work around could be to use the
>     fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
>
>     If you like, you could also work on the issue I've created by
>     implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
>     contribute this via a pull request.
>
>     – Ufuk
>
>
>     On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
>     <[hidden email]> wrote:
>     > Hi Ufuk,
>     >
>     > My mail was down, so I missed this response. Thanks for that.
>     >
>     > On 7/18/16, 10:38 AM, "Ufuk Celebi" <[hidden email]> wrote:
>     >
>     >     Hey Cliff!
>     >
>     >     I was able to reproduce this by locally running a job and RocksDB semi
>     >     asynchronous checkpoints (current default) to S3A. I've created an
>     >     issue here: https://issues.apache.org/jira/browse/FLINK-4228.
>     >
>     >     Running with S3N it is working as expected. You can use that
>     >     implementation as a work around. I don't know whether it's possible to
>     >     disable creation of MD5 hashes for S3A.
>     >
>     >     – Ufuk
>     >
>     >     On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
>     >     <[hidden email]> wrote:
>     >     > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
>     >     >
>     >     >
>     >     >
>     >     > The error I’m getting is :
>     >     >
>     >     >
>     >     >
>     >     > 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
>     >     > - Caught exception while materializing asynchronous checkpoints.
>     >     >
>     >     > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
>     >     > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
>     >     > (Is a directory)
>     >     >
>     >     >                 at
>     >     > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
>     >     >
>     >     >                 at
>     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
>     >     >
>     >     >                 at
>     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
>     >     >
>     >     >                 at
>     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
>     >     >
>     >     >                 at
>     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
>     >     >
>     >     >                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >     >
>     >     >                 at
>     >     > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     >     >
>     >     >                 at
>     >     > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     >     >
>     >     >                 at java.lang.Thread.run(Thread.java:745)
>     >     >
>     >     >
>     >     >
>     >     > In the debugger I noticed that some of the uploaded checkpoints are from the
>     >     > configured /tmp location. These succeed as file in the request is fully
>     >     > qualified, but I guess it’s different for WindowOperators? Here the file in
>     >     > the request (using a different /var/folders.. location not configured by me
>     >     > – must be a mac thing?) is actually a directory. The AWS api is failing when
>     >     > it tries to calculate an MD5 of the directory. The Flink side of the
>     >     > codepath is hard to discern from debugging because it’s asynchronous.
>     >     >
>     >     >
>     >     >
>     >     > I get the same issue whether local or on a CentOs- based YARN cluster.
>     >     > Everything works if I use HDFS instead. Any insight will be greatly
>     >     > appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5
>     >     > verification skipped.
>     >     >
>     >     >
>     >     >
>     >     > -Cliff
>     >     >
>     >     >
>     >     >
>     >     >
>     >     >
>     >     >
>     >     >
>     >     >
>     >
>     >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Clifford Resnick
I have a fix and test for a recursive HDFSCopyToLocal. I also added similar code to Yarn application staging. However, even though all files and resources now copy correctly, S3A still fails on Flink session creation. The failure stems from the lib folder being registered as an application resource (as opposed to its contained contents). Since there is no such thing as a directory in S3, there is no file creation timestamp, and the local folder resource fails with the following error:

java.io.IOException: Resource s3a://mm-dev-flink-savepoints/user/ec2-user/.flink/application_1469150519301_0014/lib changed on src filesystem (expected 0, was 1469155595413
        at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
        at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
        at java.security.AccessController.doPrivileged(Native Method)..

I was stumped by why S3NativeFilesystem does _not_ fail, but reading this perhaps answers why: http://stackoverflow.com/questions/15619712/newly-created-s3-directory-has-1969-12-31-as-timestamp

Everything works for all FileSystem implementations if I change the staging code to expand directory resources to file resources, effectively flattening all resources into the base directory. The only issue with this approach would be if there are like-named classpath resources in nested directories, but apparently the current implementation only copies jars so perhaps that’s a non-issue.

Short of altering S3A to perform the linked “hack”, I don’t see how Flink/Yarn/S3a can work as currently implemented. I can add the resource directory flattening to my impending PR but I just want to be sure to first mention the risk (like-named nested resources).

BTW, If anyone is wondering why I’m interested in S3a over S3n, it’s for this: https://issues.apache.org/jira     /browse/HADOOP-11183. In-memory multipart writes would be a great way to use the rolling file appender.                                  

-Cliff




On 7/19/16, 4:00 AM, "Ufuk Celebi" <[hidden email]> wrote:

    Feel free to do the contribution at any time you like. We can also
    always make it part of a bugfix release if it does not make it into
    the upcoming 1.1 RC (probably end of this week or beginning of next).
    Feel free to ping me if you need any feed back or pointers.
   
    – Ufuk
   
   
    On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
    <[hidden email]> wrote:
    > In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local to where the yarn app is launched) to Yarn with a single directory copy. In 1.0.3 it looked like it was copying the individual jars.
    >
    > So, yes I did actually change HDFSCopyToLocal, which was easy, but the job staging in the above class also needs altering. I’m happy to contribute on both though I won’t be able to get to it until later this week.
    >
    > -Cliff
    >
    >
    >
    > On 7/18/16, 3:38 PM, "Ufuk Celebi" <[hidden email]> wrote:
    >
    >     Hey Cliff! Good to see that we came to the same conclusion :-) What do
    >     you mean with copying of the "lib" folder? This issue should be the
    >     same for both 1.0 and 1.1. Another work around could be to use the
    >     fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
    >
    >     If you like, you could also work on the issue I've created by
    >     implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
    >     contribute this via a pull request.
    >
    >     – Ufuk
    >
    >
    >     On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
    >     <[hidden email]> wrote:
    >     > Hi Ufuk,
    >     >
    >     > My mail was down, so I missed this response. Thanks for that.
    >     >
    >     > On 7/18/16, 10:38 AM, "Ufuk Celebi" <[hidden email]> wrote:
    >     >
    >     >     Hey Cliff!
    >     >
    >     >     I was able to reproduce this by locally running a job and RocksDB semi
    >     >     asynchronous checkpoints (current default) to S3A. I've created an
    >     >     issue here: https://issues.apache.org/jira/browse/FLINK-4228.
    >     >
    >     >     Running with S3N it is working as expected. You can use that
    >     >     implementation as a work around. I don't know whether it's possible to
    >     >     disable creation of MD5 hashes for S3A.
    >     >
    >     >     – Ufuk
    >     >
    >     >     On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
    >     >     <[hidden email]> wrote:
    >     >     > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
    >     >     >
    >     >     >
    >     >     >
    >     >     > The error I’m getting is :
    >     >     >
    >     >     >
    >     >     >
    >     >     > 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
    >     >     > - Caught exception while materializing asynchronous checkpoints.
    >     >     >
    >     >     > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
    >     >     > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
    >     >     > (Is a directory)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
    >     >     >
    >     >     >                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >     >     >
    >     >     >                 at
    >     >     > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    >     >     >
    >     >     >                 at
    >     >     > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    >     >     >
    >     >     >                 at java.lang.Thread.run(Thread.java:745)
    >     >     >
    >     >     >
    >     >     >
    >     >     > In the debugger I noticed that some of the uploaded checkpoints are from the
    >     >     > configured /tmp location. These succeed as file in the request is fully
    >     >     > qualified, but I guess it’s different for WindowOperators? Here the file in
    >     >     > the request (using a different /var/folders.. location not configured by me
    >     >     > – must be a mac thing?) is actually a directory. The AWS api is failing when
    >     >     > it tries to calculate an MD5 of the directory. The Flink side of the
    >     >     > codepath is hard to discern from debugging because it’s asynchronous.
    >     >     >
    >     >     >
    >     >     >
    >     >     > I get the same issue whether local or on a CentOs- based YARN cluster.
    >     >     > Everything works if I use HDFS instead. Any insight will be greatly
    >     >     > appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5
    >     >     > verification skipped.
    >     >     >
    >     >     >
    >     >     >
    >     >     > -Cliff
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

Clifford Resnick
In reply to this post by Clifford Resnick

I took another look at this and it occurred to me that the S3a directory issue is actually localized to Cloudera's hadoop-aws version, which is stuck at 2.6.0. Apparently the zeroed out directory timestamps are in the Flink recommended version. So, Flink/Yarn/S3a will work, just not with CDH5. I'll verify and submit the original PR tomorrow morning EST.


On Jul 22, 2016 12:26 AM, Clifford Resnick <[hidden email]> wrote:
I have a fix and test for a recursive HDFSCopyToLocal. I also added similar code to Yarn application staging. However, even though all files and resources now copy correctly, S3A still fails on Flink session creation. The failure stems from the lib folder being registered as an application resource (as opposed to its contained contents). Since there is no such thing as a directory in S3, there is no file creation timestamp, and the local folder resource fails with the following error:

java.io.IOException: Resource s3a://mm-dev-flink-savepoints/user/ec2-user/.flink/application_1469150519301_0014/lib changed on src filesystem (expected 0, was 1469155595413
        at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
        at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
        at java.security.AccessController.doPrivileged(Native Method)..

I was stumped by why S3NativeFilesystem does _not_ fail, but reading this perhaps answers why: http://stackoverflow.com/questions/15619712/newly-created-s3-directory-has-1969-12-31-as-timestamp

Everything works for all FileSystem implementations if I change the staging code to expand directory resources to file resources, effectively flattening all resources into the base directory. The only issue with this approach would be if there are like-named classpath resources in nested directories, but apparently the current implementation only copies jars so perhaps that’s a non-issue.

Short of altering S3A to perform the linked “hack”, I don’t see how Flink/Yarn/S3a can work as currently implemented. I can add the resource directory flattening to my impending PR but I just want to be sure to first mention the risk (like-named nested resources).

BTW, If anyone is wondering why I’m interested in S3a over S3n, it’s for this: https://issues.apache.org/jira     /browse/HADOOP-11183. In-memory multipart writes would be a great way to use the rolling file appender.                                  

-Cliff




On 7/19/16, 4:00 AM, "Ufuk Celebi" <[hidden email]> wrote:

    Feel free to do the contribution at any time you like. We can also
    always make it part of a bugfix release if it does not make it into
    the upcoming 1.1 RC (probably end of this week or beginning of next).
    Feel free to ping me if you need any feed back or pointers.
   
    – Ufuk
   
   
    On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
    <[hidden email]> wrote:
    > In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local to where the yarn app is launched) to Yarn with a single directory copy. In 1.0.3 it looked like it was copying the individual jars.
    >
    > So, yes I did actually change HDFSCopyToLocal, which was easy, but the job staging in the above class also needs altering. I’m happy to contribute on both though I won’t be able to get to it until later this week.
    >
    > -Cliff
    >
    >
    >
    > On 7/18/16, 3:38 PM, "Ufuk Celebi" <[hidden email]> wrote:
    >
    >     Hey Cliff! Good to see that we came to the same conclusion :-) What do
    >     you mean with copying of the "lib" folder? This issue should be the
    >     same for both 1.0 and 1.1. Another work around could be to use the
    >     fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
    >
    >     If you like, you could also work on the issue I've created by
    >     implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
    >     contribute this via a pull request.
    >
    >     – Ufuk
    >
    >
    >     On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
    >     <[hidden email]> wrote:
    >     > Hi Ufuk,
    >     >
    >     > My mail was down, so I missed this response. Thanks for that.
    >     >
    >     > On 7/18/16, 10:38 AM, "Ufuk Celebi" <[hidden email]> wrote:
    >     >
    >     >     Hey Cliff!
    >     >
    >     >     I was able to reproduce this by locally running a job and RocksDB semi
    >     >     asynchronous checkpoints (current default) to S3A. I've created an
    >     >     issue here: https://issues.apache.org/jira/browse/FLINK-4228.
    >     >
    >     >     Running with S3N it is working as expected. You can use that
    >     >     implementation as a work around. I don't know whether it's possible to
    >     >     disable creation of MD5 hashes for S3A.
    >     >
    >     >     – Ufuk
    >     >
    >     >     On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
    >     >     <[hidden email]> wrote:
    >     >     > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
    >     >     >
    >     >     >
    >     >     >
    >     >     > The error I’m getting is :
    >     >     >
    >     >     >
    >     >     >
    >     >     > 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
    >     >     > - Caught exception while materializing asynchronous checkpoints.
    >     >     >
    >     >     > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
    >     >     > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
    >     >     > (Is a directory)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
    >     >     >
    >     >     >                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >     >     >
    >     >     >                 at
    >     >     > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    >     >     >
    >     >     >                 at
    >     >     > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    >     >     >
    >     >     >                 at java.lang.Thread.run(Thread.java:745)
    >     >     >
    >     >     >
    >     >     >
    >     >     > In the debugger I noticed that some of the uploaded checkpoints are from the
    >     >     > configured /tmp location. These succeed as file in the request is fully
    >     >     > qualified, but I guess it’s different for WindowOperators? Here the file in
    >     >     > the request (using a different /var/folders.. location not configured by me
    >     >     > – must be a mac thing?) is actually a directory. The AWS api is failing when
    >     >     > it tries to calculate an MD5 of the directory. The Flink side of the
    >     >     > codepath is hard to discern from debugging because it’s asynchronous.
    >     >     >
    >     >     >
    >     >     >
    >     >     > I get the same issue whether local or on a CentOs- based YARN cluster.
    >     >     > Everything works if I use HDFS instead. Any insight will be greatly
    >     >     > appreciated! When I get a chance later I may try S3n or perhaps S3a with MD5
    >     >     > verification skipped.
    >     >     >
    >     >     >
    >     >     >
    >     >     > -Cliff
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >