Hi,
We are using Flink to process click stream data from Kafka and pushing the same in 128MB file in S3. What is the message processing guarantees with S3 sink? In my understanding, S3A client buffers the data on memory/disk. In failure scenario on particular node, TM would not trigger Writer#close hence buffered data can lose entirely assuming this buffer contains data of last successful checkpointing. -- Thanks, Amit |
Hi Amit, Can you elaborate how you write using "S3 sink" and which version of Flink you are using?This way your sink is "integrated with the checkpointing mechanism to provide exactly once semantics"[2] Thanks, Rong On Thu, May 17, 2018 at 2:57 AM, Amit Jain <[hidden email]> wrote: Hi, |
Hi Rong,
We are using BucketingSink only. I'm looking for the case where TM does not get the chance to call Writer#flush like YARN killed the TM because of OOM. We have configured fs.s3.impl to com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so BucketingSink is using S3 client internally. When we write data using S3A client, it buffers up the data in memory or disk until it hit multipart file size or call to close of OutputStream happens. Now suppose, S3A client buffers up 40MB data in TM's local disk and same time checkpoint barrier comes in at Sink and got successfully completed. Write process in sink resumes and now buffer data size reaches to 60MB and now YARN killed the TM. What would happen to original 40MB of data ? -- Thanks, Amit On Thu, May 17, 2018 at 10:28 PM, Rong Rong <[hidden email]> wrote: > Hi Amit, > > Can you elaborate how you write using "S3 sink" and which version of Flink > you are using? > > If you are using BucketingSink[1], you can checkout the API doc and > configure to flush before closing your sink. > This way your sink is "integrated with the checkpointing mechanism to > provide exactly once semantics"[2] > > Thanks, > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html > > On Thu, May 17, 2018 at 2:57 AM, Amit Jain <[hidden email]> wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > > |
In reply to this post by Amit Jain
Hi Amit, The BucketingSink doesn't have well defined semantics when used with S3. Data loss is possible but I am not sure whether it is the only problem. There are plans to rewrite the BucketingSink in Flink 1.6 to enable eventually consistent file systems [1][2]. Best, Gary [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html [2] https://issues.apache.org/jira/browse/FLINK-6306 On Thu, May 17, 2018 at 11:57 AM, Amit Jain <[hidden email]> wrote: Hi, |
Thanks Gary!
Sure, there are issues with updates in S3. You may want to look over EMRFS guarantees of the consistent view [1]. I'm not sure, is it possible in non-EMR AWS system or not. I'm creating a JIRA issue regarding data loss possibility in S3. IMHO, Flink docs should mention about possible data loss in S3. [1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html -- Thanks, Amit On Fri, May 18, 2018 at 2:48 AM, Gary Yao <[hidden email]> wrote: > Hi Amit, > > The BucketingSink doesn't have well defined semantics when used with S3. > Data > loss is possible but I am not sure whether it is the only problem. There are > plans to rewrite the BucketingSink in Flink 1.6 to enable eventually > consistent > file systems [1][2]. > > Best, > Gary > > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html > [2] https://issues.apache.org/jira/browse/FLINK-6306 > > On Thu, May 17, 2018 at 11:57 AM, Amit Jain <[hidden email]> wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > > |
Free forum by Nabble | Edit this page |