Using s3 for checkpointing

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Using s3 for checkpointing

Navneeth Krishnan
Hi All,

I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few issues. I have flink running in docker with flink-s3-fs-hadoop jar copied to plugins folder. Even after having the jar I'm getting the following error: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Am I missing something?

In the documentation it says "Presto is the recommended file system for checkpointing to S3". How can I enable this? Is there a specific configuration that I need to do for this?

Also, I couldn't figure out how the entropy injection works. Should I create the bucket with checkpoints folder and flink will automatically inject an entropy and create a per job checkpoint folder or should I create it?

bucket/checkpoints/_entropy_/dashboard-job/ 
s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using s3 for checkpointing

Arvid Heise-3
Hi Navneeth,

did you follow the plugin folder structure? [1]

There is another plugin called flink-s3-fs-presto that you can use.
If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) and s3p:// for s3-fs-presto (checkpointing).


On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few issues. I have flink running in docker with flink-s3-fs-hadoop jar copied to plugins folder. Even after having the jar I'm getting the following error: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Am I missing something?

In the documentation it says "Presto is the recommended file system for checkpointing to S3". How can I enable this? Is there a specific configuration that I need to do for this?

Also, I couldn't figure out how the entropy injection works. Should I create the bucket with checkpoints folder and flink will automatically inject an entropy and create a per job checkpoint folder or should I create it?

bucket/checkpoints/_entropy_/dashboard-job/ 
s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using s3 for checkpointing

Navneeth Krishnan
Hi Arvid,

Thanks for the response. 

I have both the jars under /opt/flink/plugins but I'm still getting the same error message. Also can someone please provide some pointers on how entropy works. How should I setup the directory structure?

In the link that you have provided there is a aws-credentials.jar file, not sure where to get the jar since I don't see it in maven repo.

Plugins:

flink-s3-fs-hadoop-1.9.1.jar  

flink-s3-fs-presto-1.9.1.jar


flink-conf:

high-availability.storageDir: s3p://.../recovery



Error message:

Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.


Thanks



On Thu, Jan 30, 2020 at 2:37 AM Arvid Heise <[hidden email]> wrote:
Hi Navneeth,

did you follow the plugin folder structure? [1]

There is another plugin called flink-s3-fs-presto that you can use.
If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) and s3p:// for s3-fs-presto (checkpointing).


On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few issues. I have flink running in docker with flink-s3-fs-hadoop jar copied to plugins folder. Even after having the jar I'm getting the following error: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Am I missing something?

In the documentation it says "Presto is the recommended file system for checkpointing to S3". How can I enable this? Is there a specific configuration that I need to do for this?

Also, I couldn't figure out how the entropy injection works. Should I create the bucket with checkpoints folder and flink will automatically inject an entropy and create a per job checkpoint folder or should I create it?

bucket/checkpoints/_entropy_/dashboard-job/ 
s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using s3 for checkpointing

David Magalhães
Did you put each inside a different folder with their name? Like /opt/flink/plugins/s3-fs-presto/flink-s3-fs-presto-1.9.1.jar ?


On Sat, Feb 1, 2020, 07:42 Navneeth Krishnan <[hidden email]> wrote:
Hi Arvid,

Thanks for the response. 

I have both the jars under /opt/flink/plugins but I'm still getting the same error message. Also can someone please provide some pointers on how entropy works. How should I setup the directory structure?

In the link that you have provided there is a aws-credentials.jar file, not sure where to get the jar since I don't see it in maven repo.

Plugins:

flink-s3-fs-hadoop-1.9.1.jar  

flink-s3-fs-presto-1.9.1.jar


flink-conf:

high-availability.storageDir: s3p://.../recovery



Error message:

Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.


Thanks



On Thu, Jan 30, 2020 at 2:37 AM Arvid Heise <[hidden email]> wrote:
Hi Navneeth,

did you follow the plugin folder structure? [1]

There is another plugin called flink-s3-fs-presto that you can use.
If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) and s3p:// for s3-fs-presto (checkpointing).


On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few issues. I have flink running in docker with flink-s3-fs-hadoop jar copied to plugins folder. Even after having the jar I'm getting the following error: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Am I missing something?

In the documentation it says "Presto is the recommended file system for checkpointing to S3". How can I enable this? Is there a specific configuration that I need to do for this?

Also, I couldn't figure out how the entropy injection works. Should I create the bucket with checkpoints folder and flink will automatically inject an entropy and create a per job checkpoint folder or should I create it?

bucket/checkpoints/_entropy_/dashboard-job/ 
s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using s3 for checkpointing

Navneeth Krishnan
Thanks David. It worked after adding the jar inside a folder.

On Sat, Feb 1, 2020 at 2:37 AM David Magalhães <[hidden email]> wrote:
Did you put each inside a different folder with their name? Like /opt/flink/plugins/s3-fs-presto/flink-s3-fs-presto-1.9.1.jar ?


On Sat, Feb 1, 2020, 07:42 Navneeth Krishnan <[hidden email]> wrote:
Hi Arvid,

Thanks for the response. 

I have both the jars under /opt/flink/plugins but I'm still getting the same error message. Also can someone please provide some pointers on how entropy works. How should I setup the directory structure?

In the link that you have provided there is a aws-credentials.jar file, not sure where to get the jar since I don't see it in maven repo.

Plugins:

flink-s3-fs-hadoop-1.9.1.jar  

flink-s3-fs-presto-1.9.1.jar


flink-conf:

high-availability.storageDir: s3p://.../recovery



Error message:

Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.


Thanks



On Thu, Jan 30, 2020 at 2:37 AM Arvid Heise <[hidden email]> wrote:
Hi Navneeth,

did you follow the plugin folder structure? [1]

There is another plugin called flink-s3-fs-presto that you can use.
If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) and s3p:// for s3-fs-presto (checkpointing).


On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few issues. I have flink running in docker with flink-s3-fs-hadoop jar copied to plugins folder. Even after having the jar I'm getting the following error: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Am I missing something?

In the documentation it says "Presto is the recommended file system for checkpointing to S3". How can I enable this? Is there a specific configuration that I need to do for this?

Also, I couldn't figure out how the entropy injection works. Should I create the bucket with checkpoints folder and flink will automatically inject an entropy and create a per job checkpoint folder or should I create it?

bucket/checkpoints/_entropy_/dashboard-job/ 
s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)
Thanks