Hi, I am running flink 1.10.1 initially on my local development machine - Macbook Pro. I'm struggling to understand how to write to Google Cloud storage using the StreamingfileSink (S3 works fine). There error I am seeing: "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)" I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/ plugins ├── gcs-connector │ └── gcs-connector-hadoop2-latest.jar In flink-yaml.conf I have added: fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem google.cloud.auth.service.account.enable: true google.cloud.auth.service.account.json.keyfile: ~/key.json This mirrors the setup I used for s3 storage. My implementation is a simple test reading data from a kinesis stream and outputing to gcp. DataStream<String> input = getKinesisSource(env, kinesisStream); final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path("gs://some-gcp-bucket"), new SimpleStringEncoder<String>("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(2)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .build(); //input.print(); input.addSink(sink); Not sure what else to try. Any pointers appreciated. Sent with ProtonMail Secure Email. |
Hi, thanks for reaching out to the Flink community. The tricky thing here is that the Google Cloud Storage connector is not supported by Flink's plugin system as stated in [1]. There is a blog post on how to get started with Flink on Google's Cloud Platform [2]. In case you haven't seen that one, yet: There is a subsection "Advanced: Set up access to Google Cloud Storage for checkpoints and savepoints." describing the old way of adding support for file systems specifically show-casing the GCP Storage. There you're asked to copy the connector into Flink's lib/ directory, instead. In addition to that, you have to add the Hadoop dependencies to the lib/ folder as well. For this, it's advisable to use a bundled Hadoop lib provided by the Flink community [3] to avoid name clashes on the classpath. I hope this helps. Best, Matthias On Fri, Nov 13, 2020 at 2:32 PM orionemail <[hidden email]> wrote:
Matthias Pohl | Engineer Follow us @VervericaData Ververica -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner |
Free forum by Nabble | Edit this page |