Hi,
We have a Flink Streaming application that uses S3 for storing checkpoints. We are not using 'regular' S3, but rather IBM Object Storage which has an S3-compatible connector. We had quite some challenges in overiding the endpoint from the default s3.amnazonaws.com to our internal IBM Object Storage endpoint. In 1.3.2, we managed to get this working by providing our own jets3t.properties file that overrode s3service.s3-endpoint (https://jets3t.s3.amazonaws.com/toolkit/configuration.html) When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop artifact. Seems that our overriding with jets3t.properties is no longer relevant since does not use the Hadoop implementation anymore. Is there a way to overide this default endpoint, or with the presto endpoint can we use this? Please note that if we provide the endpoint in the URL for the state backend, it simply appends s3.amazonaws.com to the url. For example s3://myobjectstorageendpoint.s3.amazonaws.com. Are there any other solutions such as to 'rollback' to the Hadoop implementation of S3? Thanks, Hayden |
Hi,
Did you try overriding that config and it didn't work? That dependency is in fact still using the Hadoop S3 FS implementation but is shading everything to our own namespace so that there can't be any version conflicts. If that doesn't work then we need to look into this further. The way you usually use this is by putting the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder. I'm not sure including it as a dependency will work but it might. You also don't have to use flink-s3-fs-hadoop dependency if using the regular Hadoop S3 support worked for you before. It's only an additional option. Best, Aljoscha > On 24. Jan 2018, at 16:33, Marchant, Hayden <[hidden email]> wrote: > > Hi, > > We have a Flink Streaming application that uses S3 for storing checkpoints. We are not using 'regular' S3, but rather IBM Object Storage which has an S3-compatible connector. We had quite some challenges in overiding the endpoint from the default s3.amnazonaws.com to our internal IBM Object Storage endpoint. In 1.3.2, we managed to get this working by providing our own jets3t.properties file that overrode s3service.s3-endpoint (https://jets3t.s3.amazonaws.com/toolkit/configuration.html) > > When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop artifact. Seems that our overriding with jets3t.properties is no longer relevant since does not use the Hadoop implementation anymore. > > Is there a way to overide this default endpoint, or with the presto endpoint can we use this? Please note that if we provide the endpoint in the URL for the state backend, it simply appends s3.amazonaws.com to the url. For example s3://myobjectstorageendpoint.s3.amazonaws.com. > > Are there any other solutions such as to 'rollback' to the Hadoop implementation of S3? > > Thanks, > Hayden |
I see that we can still use the other implementation, but were hoping that we'd benefit from the bug fix done in Flink 1.4.0 around 'repeated' load of configuration. I'll check with the old implementation and see if it still works.
We also have seen discussions on a more native protocol that interfaces directly to IBM Object Storage that can be configured through the hdfs-site.xml called stocator that might speed things up. -----Original Message----- From: Aljoscha Krettek [mailto:[hidden email]] Sent: Thursday, January 25, 2018 6:30 PM To: Marchant, Hayden [ICG-IT] <[hidden email]> Cc: [hidden email] Subject: Re: S3 for state backend in Flink 1.4.0 Hi, Did you try overriding that config and it didn't work? That dependency is in fact still using the Hadoop S3 FS implementation but is shading everything to our own namespace so that there can't be any version conflicts. If that doesn't work then we need to look into this further. The way you usually use this is by putting the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder. I'm not sure including it as a dependency will work but it might. You also don't have to use flink-s3-fs-hadoop dependency if using the regular Hadoop S3 support worked for you before. It's only an additional option. Best, Aljoscha > On 24. Jan 2018, at 16:33, Marchant, Hayden <[hidden email]> wrote: > > Hi, > > We have a Flink Streaming application that uses S3 for storing checkpoints. We are not using 'regular' S3, but rather IBM Object Storage which has an S3-compatible connector. We had quite some challenges in overiding the endpoint from the default s3.amnazonaws.com to our internal IBM Object Storage endpoint. In 1.3.2, we managed to get this working by providing our own jets3t.properties file that overrode s3service.s3-endpoint (https://urldefense.proofpoint.com/v2/url?u=https-3A__jets3t.s3.amazonaws.com_toolkit_configuration.html&d=DwIFAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=pGMzFMafCab1RjHp3FDDKhlafEqeVPGytcX4PMbDk5Y&s=K2NJPrY_Mdv0u0B2CIvuckgr26dlraUJwZEU6aq5yXM&e=) > > When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop artifact. Seems that our overriding with jets3t.properties is no longer relevant since does not use the Hadoop implementation anymore. > > Is there a way to overide this default endpoint, or with the presto endpoint can we use this? Please note that if we provide the endpoint in the URL for the state backend, it simply appends s3.amazonaws.com to the url. For example s3://myobjectstorageendpoint.s3.amazonaws.com. > > Are there any other solutions such as to 'rollback' to the Hadoop implementation of S3? > > Thanks, > Hayden |
Hi,
We are having a similar problem when trying to use Flink 1.4.0 with IBM Object Storage for reading and writing data. We followed https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html and the suggestion on https://issues.apache.org/jira/browse/FLINK-851. We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder and we added the configuration on the flink-config.yaml: s3.access-key: <ACCESS_KEY> s3.secret-key: <SECRET_KEY> s3.endpoint: s3.us-south.objectstorage.softlayer.net With this we can read from IBM Object Storage without any problem when using env.readTextFile("s3://flink-test/flink-test.txt"); But we are having problems when trying to write. We are using a kafka consumer to read from the bus, we're making some processing and after saving some data on Object Storage. When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1); The file is created but only when the job finish (or we stop it). But we need to save the data without stopping the job, so we are trying to use a Sink. But when using a BucketingSink, we get the error: java.io.IOException: No FileSystem for scheme: s3 at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) Do you have any idea how could we make it work using Sink? Thanks, Regards, Edward -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Edward,
The problem here is that readTextFile() and writeAsText() use the Flink FileSystem abstraction underneath, which will pick up the s3 filesystem from opt. The BucketingSink, on the other hand, uses the Hadoop FileSystem abstraction directly, meaning that there has to be some HadoopFilesystem implementation for s3 in the path for this to work. Also, the BucketingSink currently has some shortcomings when used with eventually consistent file systems, such as S3. We are planning to solve those problems after releasing 1.5 and there is also an open PR that provides an alternative sink that works with those kinds of file system: https://github.com/apache/flink/pull/4607 Best, Aljoscha
|
Thanks Aljoscha. That makes sense.
Do you have a more specific date for the changes on BucketingSink and/or the PR to be released ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Unfortunately not yet, though it's high on my personal list of stuff that I want to get resolved. It won't make it into 1.5.0 but I think 1.6.0. Best, Aljoscha > On 31. Jan 2018, at 16:31, Edward Rojas <[hidden email]> wrote: > > Thanks Aljoscha. That makes sense. > Do you have a more specific date for the changes on BucketingSink and/or the > PR to be released ? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Aljoscha,
Thinking a little bit more about this, although IBM Object storage is compatible with Amazon's S3, it's not an eventually consistent file system, but rather immediately consistent. So we won't need the support for eventually consistent FS for our use case to work, but we would only need that the BucketingSink uses the Flink FileSystem abstraction instead of directly using the Hadoop FileSystem abstraction. Is this something that could be released earlier ? Or do you have any idea how we could achieve it ? Regards, Edward -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Edward Rojas
Edward,
We are using Object Storage for checkpointing. I'd like to point out that we were seeing performance problems using the S3 protocol. Btw, we had quite a few problems using the flink-s3-fs-hadoop jar with Object Storage and had to do some ugly hacking to get it working all over. We recently discovered an alternative connector developed by IBM Research called stocator. It's a streaming writer and performs better than using the S3 protocol. Here is a link to the library - https://github.com/SparkTC/stocator, and a blog explaining about it - http://www.spark.tc/stocator-the-fast-lane-connecting-object-stores-to-spark/ Good luck!! -----Original Message----- From: Edward Rojas [mailto:[hidden email]] Sent: Wednesday, January 31, 2018 3:02 PM To: [hidden email] Subject: RE: S3 for state backend in Flink 1.4.0 Hi, We are having a similar problem when trying to use Flink 1.4.0 with IBM Object Storage for reading and writing data. We followed https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.4_ops_deployment_aws.html&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=gY41yFjnJzQNaL3R1YK7HzG8XUyBn0kJ6_3m-4t7E7k&e= and the suggestion on https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D851&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=bDXNhnIV4KFTK9Byg5w2R_8UlWiXH05uAp9rkWJm_jo&e=. We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder and we added the configuration on the flink-config.yaml: s3.access-key: <ACCESS_KEY> s3.secret-key: <SECRET_KEY> s3.endpoint: s3.us-south.objectstorage.softlayer.net With this we can read from IBM Object Storage without any problem when using env.readTextFile("s3://flink-test/flink-test.txt"); But we are having problems when trying to write. We are using a kafka consumer to read from the bus, we're making some processing and after saving some data on Object Storage. When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1); The file is created but only when the job finish (or we stop it). But we need to save the data without stopping the job, so we are trying to use a Sink. But when using a BucketingSink, we get the error: java.io.IOException: No FileSystem for scheme: s3 at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) Do you have any idea how could we make it work using Sink? Thanks, Regards, Edward -- Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=vN9sFldnlnzHZPgOBi42Rwfq1Hbq79gUPUNLgi0zmSM&e= |
Hi Hayden,
It seems like a good alternative. But I see it's intended to work with spark, did you manage to get it working with Flink ? I some tests but I get several errors when trying to create a file, either for checkpointing or saving data. Thanks in advance, Regards, Edward -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
WE actually got it working. Essentially, it's an implementation of HadoopFilesytem, and was written with the idea that it can be used with Spark (since it has broader adoption than Flink as of now). We managed to get it configured, and found the latency to be much lower than by using the s3 connector. There are a lot less copying operations etc... happening under the hood when using this native API which explains the better performance.
Happy to provide assistance offline if you're interested. Thanks Hayden -----Original Message----- From: Edward Rojas [mailto:[hidden email]] Sent: Thursday, February 01, 2018 6:09 PM To: [hidden email] Subject: RE: S3 for state backend in Flink 1.4.0 Hi Hayden, It seems like a good alternative. But I see it's intended to work with spark, did you manage to get it working with Flink ? I some tests but I get several errors when trying to create a file, either for checkpointing or saving data. Thanks in advance, Regards, Edward -- Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=MW1NZ-mLVkooOHg-TWiOE7j2e9PCk7EOAmahXApcLtQ&s=b8kvNKIjylDuKlc2munyBj1da85y8aZ8brJsO24R2GU&e= |
A heads up on this front: - For state backends during checkpointing, I would suggest to use the flink-s3-fs-presto, which is quite a bit faster than the flink-s3-fs-hadoop by avoiding a bunch of unnecessary metadata operations. - We have started work on re-writing the Bucketing Sink to make it work with the shaded S3 filesystems (like
flink-s3-fs-presto). We are also adding a more powerful internal abstraction that uses multipart uploads for faster incremental persistence of result chunks on checkpoints. This should be in 1.6, happy to share more as soon as it is out... On Wed, Feb 7, 2018 at 3:56 PM, Marchant, Hayden <[hidden email]> wrote: WE actually got it working. Essentially, it's an implementation of HadoopFilesytem, and was written with the idea that it can be used with Spark (since it has broader adoption than Flink as of now). We managed to get it configured, and found the latency to be much lower than by using the s3 connector. There are a lot less copying operations etc... happening under the hood when using this native API which explains the better performance. |
Free forum by Nabble | Edit this page |