K8s native - checkpointing to S3 with RockDBStateBackend

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

K8s native - checkpointing to S3 with RockDBStateBackend

Averell
Hi,
I am trying to deploy my job to Kubernetes following the native-Kubernetes
guide. My job is checkpointing to S3 with RockDBStateBackend. It also has a
S3 StreamingFileSink.
In my jar file, I've already had /flink-hadoop-fs,
flink-connector-filesystem, flink-s3-fs-hadoop /(as my understanding, these
are for the S3 sink, please correct me if I'm wrong)

When I tried to submit the job, I got the following error (only a few
seconds after submitting): /Could not find a file system implementation for
scheme 's3'. The scheme is not directly supported by Flink and no Hadoop
file system to support this scheme could be loaded/

Not sure how I can get over this.
Using s3a didn't help (s3 does work well when running on my dev machine)
I also tried to copy the file /flink-shaded-hadoop-2-uber-2.8.3-10.0.jar/ to
the //opt/flink/lib// folder of the JobManager pod, but it didn't help (is
it already too late? should that be there before the JM is started?)

Thanks for your help.
Averell


/
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
checkpoint storage at checkpoint coordinator side.
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:282)
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
        at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
        at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
        at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
        ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3a'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
        at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
        at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64)
        at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490)
        at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:477)
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279)
        ... 23 more/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: K8s native - checkpointing to S3 with RockDBStateBackend

Yun Tang
Hi Averell

Please build your own flink docker with S3 plugin as official doc said [1]


Best
Yun Tang

From: Averell <[hidden email]>
Sent: Thursday, April 23, 2020 20:58
To: [hidden email] <[hidden email]>
Subject: K8s native - checkpointing to S3 with RockDBStateBackend
 
Hi,
I am trying to deploy my job to Kubernetes following the native-Kubernetes
guide. My job is checkpointing to S3 with RockDBStateBackend. It also has a
S3 StreamingFileSink.
In my jar file, I've already had /flink-hadoop-fs,
flink-connector-filesystem, flink-s3-fs-hadoop /(as my understanding, these
are for the S3 sink, please correct me if I'm wrong)

When I tried to submit the job, I got the following error (only a few
seconds after submitting): /Could not find a file system implementation for
scheme 's3'. The scheme is not directly supported by Flink and no Hadoop
file system to support this scheme could be loaded/

Not sure how I can get over this.
Using s3a didn't help (s3 does work well when running on my dev machine)
I also tried to copy the file /flink-shaded-hadoop-2-uber-2.8.3-10.0.jar/ to
the //opt/flink/lib// folder of the JobManager pod, but it didn't help (is
it already too late? should that be there before the JM is started?)

Thanks for your help.
Averell


/
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create
checkpoint storage at checkpoint coordinator side.
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:282)
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
        at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
        at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
        at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
        ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3a'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
        at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
        at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64)
        at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490)
        at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:477)
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279)
        ... 23 more/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: K8s native - checkpointing to S3 with RockDBStateBackend

Averell
Thank you Yun Tang.
Building my own docker image as suggested solved my problem.

However, I don't understand why I need that while I already had that
s3-hadoop jar included in my uber jar?

Thanks.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: K8s native - checkpointing to S3 with RockDBStateBackend

David Magalhães
I think the classloaders for the uberjar and the link are different. Not sure if this is the right explanation, but that is why you need to add flink-s3-fs-hadoop inside the plugin folder in the cluster.

On Fri, Apr 24, 2020 at 4:07 PM Averell <[hidden email]> wrote:
Thank you Yun Tang.
Building my own docker image as suggested solved my problem.

However, I don't understand why I need that while I already had that
s3-hadoop jar included in my uber jar?

Thanks.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: K8s native - checkpointing to S3 with RockDBStateBackend

Yang Wang
Hi Averell,

I think David's answer is right. The user uber jar will be loaded lazily by user classloader.
So it cannot be recognized by Flink system class. You need to put it directly /opt/flink/lib 
directory or loaded via plugin mechanism.

Best,
Yang

David Magalhães <[hidden email]> 于2020年4月25日周六 上午12:05写道:
I think the classloaders for the uberjar and the link are different. Not sure if this is the right explanation, but that is why you need to add flink-s3-fs-hadoop inside the plugin folder in the cluster.

On Fri, Apr 24, 2020 at 4:07 PM Averell <[hidden email]> wrote:
Thank you Yun Tang.
Building my own docker image as suggested solved my problem.

However, I don't understand why I need that while I already had that
s3-hadoop jar included in my uber jar?

Thanks.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: K8s native - checkpointing to S3 with RockDBStateBackend

Averell
Hi David, Yang,

Thanks. But I just tried to submit the same job on a YARN cluster using that
same uberjar, and it was successful. I don't have flink-s3-fs-hadoop.jar
anywhere in the lib or plugin folder.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: K8s native - checkpointing to S3 with RockDBStateBackend

Yang Wang
Hi Averell,

Hadoop could directly support S3AFileSystem. When you deploy a Flink job
on YARN, the hadoop classpath will be added to JobManager/TaskManager
automatically. That means you could use "s3a" schema without putting 
"flink-s3-fs-hadoop.jar" in the plugin directory.

In K8s deployment, we do not have a hadoop filesystem by default. So then
you need to do this manually.


Best,
Yang

Averell <[hidden email]> 于2020年4月27日周一 下午1:46写道:
Hi David, Yang,

Thanks. But I just tried to submit the same job on a YARN cluster using that
same uberjar, and it was successful. I don't have flink-s3-fs-hadoop.jar
anywhere in the lib or plugin folder.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/