Hi,
I am trying to deploy my job to Kubernetes following the native-Kubernetes guide. My job is checkpointing to S3 with RockDBStateBackend. It also has a S3 StreamingFileSink. In my jar file, I've already had /flink-hadoop-fs, flink-connector-filesystem, flink-s3-fs-hadoop /(as my understanding, these are for the S3 sink, please correct me if I'm wrong) When I tried to submit the job, I got the following error (only a few seconds after submitting): /Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded/ Not sure how I can get over this. Using s3a didn't help (s3 does work well when running on my dev machine) I also tried to copy the file /flink-shaded-hadoop-2-uber-2.8.3-10.0.jar/ to the //opt/flink/lib// folder of the JobManager pod, but it didn't help (is it already too late? should that be there before the JM is started?) Thanks for your help. Averell / Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:282) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205) at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:477) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279) ... 23 more/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell
Please build your own flink docker with S3 plugin as official doc said [1]
Best
Yun Tang
From: Averell <[hidden email]>
Sent: Thursday, April 23, 2020 20:58 To: [hidden email] <[hidden email]> Subject: K8s native - checkpointing to S3 with RockDBStateBackend Hi,
I am trying to deploy my job to Kubernetes following the native-Kubernetes guide. My job is checkpointing to S3 with RockDBStateBackend. It also has a S3 StreamingFileSink. In my jar file, I've already had /flink-hadoop-fs, flink-connector-filesystem, flink-s3-fs-hadoop /(as my understanding, these are for the S3 sink, please correct me if I'm wrong) When I tried to submit the job, I got the following error (only a few seconds after submitting): /Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded/ Not sure how I can get over this. Using s3a didn't help (s3 does work well when running on my dev machine) I also tried to copy the file /flink-shaded-hadoop-2-uber-2.8.3-10.0.jar/ to the //opt/flink/lib// folder of the JobManager pod, but it didn't help (is it already too late? should that be there before the JM is started?) Thanks for your help. Averell / Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:282) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205) at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:490) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:477) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279) ... 23 more/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thank you Yun Tang.
Building my own docker image as suggested solved my problem. However, I don't understand why I need that while I already had that s3-hadoop jar included in my uber jar? Thanks. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I think the classloaders for the uberjar and the link are different. Not sure if this is the right explanation, but that is why you need to add flink-s3-fs-hadoop inside the plugin folder in the cluster. On Fri, Apr 24, 2020 at 4:07 PM Averell <[hidden email]> wrote: Thank you Yun Tang. |
Hi Averell, I think David's answer is right. The user uber jar will be loaded lazily by user classloader. So it cannot be recognized by Flink system class. You need to put it directly /opt/flink/lib directory or loaded via plugin mechanism. Best, Yang David Magalhães <[hidden email]> 于2020年4月25日周六 上午12:05写道:
|
Hi David, Yang,
Thanks. But I just tried to submit the same job on a YARN cluster using that same uberjar, and it was successful. I don't have flink-s3-fs-hadoop.jar anywhere in the lib or plugin folder. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, Hadoop could directly support S3AFileSystem. When you deploy a Flink job on YARN, the hadoop classpath will be added to JobManager/TaskManager automatically. That means you could use "s3a" schema without putting "flink-s3-fs-hadoop.jar" in the plugin directory. In K8s deployment, we do not have a hadoop filesystem by default. So then you need to do this manually. Best, Yang Averell <[hidden email]> 于2020年4月27日周一 下午1:46写道: Hi David, Yang, |
Free forum by Nabble | Edit this page |