Difficulties with Minio state storage

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Difficulties with Minio state storage

Rex Fenley
Hello!

I'm trying to test out Minio as state storage backend using docker-compose on my local machine but keep running into errors that seem strange to me. Any help would be much appreciated :)

The problem:
With the following environment:

environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
parallelism.default: 2
s3.access-key: <key>
s3.secret-key: <key>
s3.path.style.access: true

And the following State Backend (with flink-jdbc-test_graph-minio_1 being the container serving minio):

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setStateBackend(
new RocksDBStateBackend(
"s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
true
)
)

And submitting the flink job and saving from another docker container like so:

flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c <Job Class Name> <built code>.jar

flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 <Job ID> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints

I end up with the following error:

Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=), S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY= (Path: s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
... 10 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)

If I add to the environment to include:
...
s3.endpoint: s3://flink-jdbc-test_graph-minio_1:9000
...

Then I end up with the following error just trying to submit the job:
Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: s3://flink-jdbc-test_graph-minio_1:9000
at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)


Changing s3: to http: like so:

Then I receive the same error as before when trying to submit the job:
Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: http://flink-jdbc-test_graph-minio_1:9000
at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)


However, I can access the minio container via the minio client from docker just fine:
./mc alias set minio http://flink-jdbc-test_graph-minio_1:9000 key key --api S3v4
But there are no buckets, presumably because saving always fails:
./mc ls minio
<nothing returned>

Does anyone know how to resolve this issue?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Difficulties with Minio state storage

Yangze Guo
Hi, Rex,

I've tried to use MinIO as state backend and everything seems works well.
Just sharing my configuration:
```
s3.access-key:
s3.secret-key:
s3.endpoint: http://localhost:9000
s3.path.style.access: true
state.checkpoints.dir: s3://flink/checkpoints
```

I think the problem might be caused by the following reasons:
- The MinIO is not well configured.
- Maybe you need to create a bucket for it first. In my case, I create
a bucket called "flink" first.

Best,
Yangze Guo

On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley <[hidden email]> wrote:

>
> Hello!
>
> I'm trying to test out Minio as state storage backend using docker-compose on my local machine but keep running into errors that seem strange to me. Any help would be much appreciated :)
>
> The problem:
> With the following environment:
>
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> parallelism.default: 2
> s3.access-key: <key>
> s3.secret-key: <key>
> s3.path.style.access: true
>
> And the following State Backend (with flink-jdbc-test_graph-minio_1 being the container serving minio):
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStateBackend(
> new RocksDBStateBackend(
> "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
> true
> )
> )
>
> And submitting the flink job and saving from another docker container like so:
>
> flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c <Job Class Name> <built code>.jar
>
> flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 <Job ID> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
>
> I end up with the following error:
>
> Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=), S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY= (Path: s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
> ... 10 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>
> If I add to the environment to include:
> ...
> s3.endpoint: s3://flink-jdbc-test_graph-minio_1:9000
> ...
>
> Then I end up with the following error just trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: s3://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> Changing s3: to http: like so:
> s3.endpoint: http://flink-jdbc-test_graph-minio_1:9000
>
> Then I receive the same error as before when trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: http://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> However, I can access the minio container via the minio client from docker just fine:
> ./mc alias set minio http://flink-jdbc-test_graph-minio_1:9000 key key --api S3v4
> But there are no buckets, presumably because saving always fails:
> ./mc ls minio
> <nothing returned>
>
> Does anyone know how to resolve this issue?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
Reply | Threaded
Open this post in threaded view
|

Re: Difficulties with Minio state storage

Arvid Heise-3
Hi Rex,

you could also check the end to end tests that use minio in flink's repo. You definitely need to use an http endpoint.

The setup [1] uses also another way to specify the s3.path.style.access (with dashes). I think we needed it especially for presto. It seems like the settings differ a bit across the implementations, so give it a try. It might also be something that we should translate.
For reference, the actual test using presto can be found here [2].


On Wed, Sep 9, 2020 at 4:17 AM Yangze Guo <[hidden email]> wrote:
Hi, Rex,

I've tried to use MinIO as state backend and everything seems works well.
Just sharing my configuration:
```
s3.access-key:
s3.secret-key:
s3.endpoint: http://localhost:9000
s3.path.style.access: true
state.checkpoints.dir: s3://flink/checkpoints
```

I think the problem might be caused by the following reasons:
- The MinIO is not well configured.
- Maybe you need to create a bucket for it first. In my case, I create
a bucket called "flink" first.

Best,
Yangze Guo

On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley <[hidden email]> wrote:
>
> Hello!
>
> I'm trying to test out Minio as state storage backend using docker-compose on my local machine but keep running into errors that seem strange to me. Any help would be much appreciated :)
>
> The problem:
> With the following environment:
>
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> parallelism.default: 2
> s3.access-key: <key>
> s3.secret-key: <key>
> s3.path.style.access: true
>
> And the following State Backend (with flink-jdbc-test_graph-minio_1 being the container serving minio):
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStateBackend(
> new RocksDBStateBackend(
> "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
> true
> )
> )
>
> And submitting the flink job and saving from another docker container like so:
>
> flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c <Job Class Name> <built code>.jar
>
> flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 <Job ID> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
>
> I end up with the following error:
>
> Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=), S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY= (Path: s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
> ... 10 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>
> If I add to the environment to include:
> ...
> s3.endpoint: s3://flink-jdbc-test_graph-minio_1:9000
> ...
>
> Then I end up with the following error just trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: s3://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> Changing s3: to http: like so:
> s3.endpoint: http://flink-jdbc-test_graph-minio_1:9000
>
> Then I receive the same error as before when trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: http://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> However, I can access the minio container via the minio client from docker just fine:
> ./mc alias set minio http://flink-jdbc-test_graph-minio_1:9000 key key --api S3v4
> But there are no buckets, presumably because saving always fails:
> ./mc ls minio
> <nothing returned>
>
> Does anyone know how to resolve this issue?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Difficulties with Minio state storage

Rex Fenley
Thanks yall,

Yangze,
> I've tried to use MinIO as state backend and everything seems works well
For clarity, I'm using RocksDB state backend with Minio as state storage.
> s3.endpoint: http://localhost:9000
Also, I'm doing everything from docker-compose so localhost isn't going to work in my case.


Arvid,
> You definitely need to use an http endpoint.
I always receive errors like the following when I use http:
Caused by: java.util.concurrent.CompletionException: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'http'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
Whereas s3:// gives me Bad Request errors instead

Thanks


On Wed, Sep 9, 2020 at 8:03 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

you could also check the end to end tests that use minio in flink's repo. You definitely need to use an http endpoint.

The setup [1] uses also another way to specify the s3.path.style.access (with dashes). I think we needed it especially for presto. It seems like the settings differ a bit across the implementations, so give it a try. It might also be something that we should translate.
For reference, the actual test using presto can be found here [2].


On Wed, Sep 9, 2020 at 4:17 AM Yangze Guo <[hidden email]> wrote:
Hi, Rex,

I've tried to use MinIO as state backend and everything seems works well.
Just sharing my configuration:
```
s3.access-key:
s3.secret-key:
s3.endpoint: http://localhost:9000
s3.path.style.access: true
state.checkpoints.dir: s3://flink/checkpoints
```

I think the problem might be caused by the following reasons:
- The MinIO is not well configured.
- Maybe you need to create a bucket for it first. In my case, I create
a bucket called "flink" first.

Best,
Yangze Guo

On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley <[hidden email]> wrote:
>
> Hello!
>
> I'm trying to test out Minio as state storage backend using docker-compose on my local machine but keep running into errors that seem strange to me. Any help would be much appreciated :)
>
> The problem:
> With the following environment:
>
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> parallelism.default: 2
> s3.access-key: <key>
> s3.secret-key: <key>
> s3.path.style.access: true
>
> And the following State Backend (with flink-jdbc-test_graph-minio_1 being the container serving minio):
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStateBackend(
> new RocksDBStateBackend(
> "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
> true
> )
> )
>
> And submitting the flink job and saving from another docker container like so:
>
> flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c <Job Class Name> <built code>.jar
>
> flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 <Job ID> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
>
> I end up with the following error:
>
> Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=), S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY= (Path: s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
> ... 10 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>
> If I add to the environment to include:
> ...
> s3.endpoint: s3://flink-jdbc-test_graph-minio_1:9000
> ...
>
> Then I end up with the following error just trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: s3://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> Changing s3: to http: like so:
> s3.endpoint: http://flink-jdbc-test_graph-minio_1:9000
>
> Then I receive the same error as before when trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: http://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> However, I can access the minio container via the minio client from docker just fine:
> ./mc alias set minio http://flink-jdbc-test_graph-minio_1:9000 key key --api S3v4
> But there are no buckets, presumably because saving always fails:
> ./mc ls minio
> <nothing returned>
>
> Does anyone know how to resolve this issue?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Difficulties with Minio state storage

Rex Fenley
Good news!

Eliminating
bsEnv.setStateBackend(
  new RocksDBStateBackend(
    "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
    true
  )
)
moving all configuration into FLINK_PROPERTIES and switching to http seemed to do the trick!

Thanks for all the help!



On Wed, Sep 9, 2020 at 9:45 AM Rex Fenley <[hidden email]> wrote:
Thanks yall,

Yangze,
> I've tried to use MinIO as state backend and everything seems works well
For clarity, I'm using RocksDB state backend with Minio as state storage.
> s3.endpoint: http://localhost:9000
Also, I'm doing everything from docker-compose so localhost isn't going to work in my case.


Arvid,
> You definitely need to use an http endpoint.
I always receive errors like the following when I use http:
Caused by: java.util.concurrent.CompletionException: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'http'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
Whereas s3:// gives me Bad Request errors instead

Thanks


On Wed, Sep 9, 2020 at 8:03 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

you could also check the end to end tests that use minio in flink's repo. You definitely need to use an http endpoint.

The setup [1] uses also another way to specify the s3.path.style.access (with dashes). I think we needed it especially for presto. It seems like the settings differ a bit across the implementations, so give it a try. It might also be something that we should translate.
For reference, the actual test using presto can be found here [2].


On Wed, Sep 9, 2020 at 4:17 AM Yangze Guo <[hidden email]> wrote:
Hi, Rex,

I've tried to use MinIO as state backend and everything seems works well.
Just sharing my configuration:
```
s3.access-key:
s3.secret-key:
s3.endpoint: http://localhost:9000
s3.path.style.access: true
state.checkpoints.dir: s3://flink/checkpoints
```

I think the problem might be caused by the following reasons:
- The MinIO is not well configured.
- Maybe you need to create a bucket for it first. In my case, I create
a bucket called "flink" first.

Best,
Yangze Guo

On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley <[hidden email]> wrote:
>
> Hello!
>
> I'm trying to test out Minio as state storage backend using docker-compose on my local machine but keep running into errors that seem strange to me. Any help would be much appreciated :)
>
> The problem:
> With the following environment:
>
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> parallelism.default: 2
> s3.access-key: <key>
> s3.secret-key: <key>
> s3.path.style.access: true
>
> And the following State Backend (with flink-jdbc-test_graph-minio_1 being the container serving minio):
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStateBackend(
> new RocksDBStateBackend(
> "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
> true
> )
> )
>
> And submitting the flink job and saving from another docker container like so:
>
> flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c <Job Class Name> <built code>.jar
>
> flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 <Job ID> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
>
> I end up with the following error:
>
> Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=), S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY= (Path: s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
> at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
> ... 10 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID: cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>
> If I add to the environment to include:
> ...
> s3.endpoint: s3://flink-jdbc-test_graph-minio_1:9000
> ...
>
> Then I end up with the following error just trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: s3://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> Changing s3: to http: like so:
> s3.endpoint: http://flink-jdbc-test_graph-minio_1:9000
>
> Then I receive the same error as before when trying to submit the job:
> Caused by: java.lang.IllegalArgumentException: Endpoint does not contain a valid host name: http://flink-jdbc-test_graph-minio_1:9000
> at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:426)
> at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:318)
>
> However, I can access the minio container via the minio client from docker just fine:
> ./mc alias set minio http://flink-jdbc-test_graph-minio_1:9000 key key --api S3v4
> But there are no buckets, presumably because saving always fails:
> ./mc ls minio
> <nothing returned>
>
> Does anyone know how to resolve this issue?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US