Assistance configuring access to GoogleCloudStorage for row format streaming file sink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Assistance configuring access to GoogleCloudStorage for row format streaming file sink

orionemail
Hi,

I am running flink 1.10.1 initially on my local development machine - Macbook Pro.  I'm struggling to understand how to write to Google Cloud storage using the StreamingfileSink  (S3 works fine).

There error I am seeing:

"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)"


I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/

plugins
├── gcs-connector
│   └── gcs-connector-hadoop2-latest.jar

In flink-yaml.conf I have added:

fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
google.cloud.auth.service.account.enable: true
google.cloud.auth.service.account.json.keyfile: ~/key.json

This mirrors the setup I used for s3 storage.

My implementation is a simple test reading data from a kinesis stream and outputing to gcp.

DataStream<String> input = getKinesisSource(env, kinesisStream);

final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("gs://some-gcp-bucket"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();

//input.print();
input.addSink(sink);


Not sure what else to try.  Any pointers appreciated.



Sent with ProtonMail Secure Email.

Reply | Threaded
Open this post in threaded view
|

Re: Assistance configuring access to GoogleCloudStorage for row format streaming file sink

Matthias
Hi,
thanks for reaching out to the Flink community. The tricky thing here is that the Google Cloud Storage connector is not supported by Flink's plugin system as stated in [1]. There is a blog post on how to get started with Flink on Google's Cloud Platform [2]. In case you haven't seen that one, yet: There is a subsection "Advanced: Set up access to Google Cloud Storage for checkpoints and savepoints." describing the old way of adding support for file systems specifically show-casing the GCP Storage. There you're asked to copy the connector into Flink's lib/ directory, instead. In addition to that, you have to add the Hadoop dependencies to the lib/ folder as well. For this, it's advisable to use a bundled Hadoop lib provided by the Flink community [3] to avoid name clashes on the classpath.

I hope this helps.

Best,
Matthias


On Fri, Nov 13, 2020 at 2:32 PM orionemail <[hidden email]> wrote:
Hi,

I am running flink 1.10.1 initially on my local development machine - Macbook Pro.  I'm struggling to understand how to write to Google Cloud storage using the StreamingfileSink  (S3 works fine).

There error I am seeing:

"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)"


I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/

plugins
├── gcs-connector
│   └── gcs-connector-hadoop2-latest.jar

In flink-yaml.conf I have added:

fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
google.cloud.auth.service.account.enable: true
google.cloud.auth.service.account.json.keyfile: ~/key.json

This mirrors the setup I used for s3 storage.

My implementation is a simple test reading data from a kinesis stream and outputing to gcp.

DataStream<String> input = getKinesisSource(env, kinesisStream);

final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("gs://some-gcp-bucket"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();

//input.print();
input.addSink(sink);


Not sure what else to try.  Any pointers appreciated.



Sent with ProtonMail Secure Email.



--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner