Message guarantees with S3 Sink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Message guarantees with S3 Sink

Amit Jain
Hi,

We are using Flink to process click stream data from Kafka and pushing
the same in 128MB file in S3.

What is the message processing guarantees with S3 sink? In my
understanding, S3A client buffers the data on memory/disk. In failure
scenario on particular node, TM would not trigger Writer#close hence
buffered data can lose entirely assuming this buffer contains data of
last successful checkpointing.

--
Thanks,
Amit
Reply | Threaded
Open this post in threaded view
|

Re: Message guarantees with S3 Sink

Rong Rong
Hi Amit,

Can you elaborate how you write using "S3 sink" and which version of Flink you are using?

If you are using BucketingSink[1], you can checkout the API doc and configure to flush before closing your sink.
This way your sink is "integrated with the checkpointing mechanism to provide exactly once semantics"[2]

Thanks,
Rong

On Thu, May 17, 2018 at 2:57 AM, Amit Jain <[hidden email]> wrote:
Hi,

We are using Flink to process click stream data from Kafka and pushing
the same in 128MB file in S3.

What is the message processing guarantees with S3 sink? In my
understanding, S3A client buffers the data on memory/disk. In failure
scenario on particular node, TM would not trigger Writer#close hence
buffered data can lose entirely assuming this buffer contains data of
last successful checkpointing.

--
Thanks,
Amit

Reply | Threaded
Open this post in threaded view
|

Re: Message guarantees with S3 Sink

Amit Jain
Hi Rong,

We are using BucketingSink only. I'm looking for the case where TM
does not get the chance to call Writer#flush like YARN killed the TM
because of OOM. We have configured fs.s3.impl to
com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so
BucketingSink is using S3 client internally.

When we write data using S3A client, it buffers up the data in memory
or disk until it hit multipart file size or call to close of
OutputStream happens. Now suppose, S3A client buffers up 40MB data in
TM's local disk and same time checkpoint barrier comes in at Sink and
got successfully completed. Write process in sink resumes and now
buffer data size reaches to 60MB and now YARN killed the TM. What
would happen to original 40MB of data ?

--
Thanks,
Amit




On Thu, May 17, 2018 at 10:28 PM, Rong Rong <[hidden email]> wrote:

> Hi Amit,
>
> Can you elaborate how you write using "S3 sink" and which version of Flink
> you are using?
>
> If you are using BucketingSink[1], you can checkout the API doc and
> configure to flush before closing your sink.
> This way your sink is "integrated with the checkpointing mechanism to
> provide exactly once semantics"[2]
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>
> On Thu, May 17, 2018 at 2:57 AM, Amit Jain <[hidden email]> wrote:
>>
>> Hi,
>>
>> We are using Flink to process click stream data from Kafka and pushing
>> the same in 128MB file in S3.
>>
>> What is the message processing guarantees with S3 sink? In my
>> understanding, S3A client buffers the data on memory/disk. In failure
>> scenario on particular node, TM would not trigger Writer#close hence
>> buffered data can lose entirely assuming this buffer contains data of
>> last successful checkpointing.
>>
>> --
>> Thanks,
>> Amit
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Message guarantees with S3 Sink

Gary Yao-2
In reply to this post by Amit Jain
Hi Amit,

The BucketingSink doesn't have well defined semantics when used with S3. Data
loss is possible but I am not sure whether it is the only problem. There are
plans to rewrite the BucketingSink in Flink 1.6 to enable eventually consistent
file systems [1][2].

Best,
Gary


[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html
[2] https://issues.apache.org/jira/browse/FLINK-6306

On Thu, May 17, 2018 at 11:57 AM, Amit Jain <[hidden email]> wrote:
Hi,

We are using Flink to process click stream data from Kafka and pushing
the same in 128MB file in S3.

What is the message processing guarantees with S3 sink? In my
understanding, S3A client buffers the data on memory/disk. In failure
scenario on particular node, TM would not trigger Writer#close hence
buffered data can lose entirely assuming this buffer contains data of
last successful checkpointing.

--
Thanks,
Amit

Reply | Threaded
Open this post in threaded view
|

Re: Message guarantees with S3 Sink

Amit Jain
Thanks Gary!

Sure, there are issues with updates in S3. You may want to look over
EMRFS guarantees of the consistent view [1]. I'm not sure, is it
possible in non-EMR AWS system or not.

I'm creating a JIRA issue regarding data loss possibility in S3. IMHO,
Flink docs should mention about possible data loss in S3.

[1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html

--
Thanks,
Amit

On Fri, May 18, 2018 at 2:48 AM, Gary Yao <[hidden email]> wrote:

> Hi Amit,
>
> The BucketingSink doesn't have well defined semantics when used with S3.
> Data
> loss is possible but I am not sure whether it is the only problem. There are
> plans to rewrite the BucketingSink in Flink 1.6 to enable eventually
> consistent
> file systems [1][2].
>
> Best,
> Gary
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html
> [2] https://issues.apache.org/jira/browse/FLINK-6306
>
> On Thu, May 17, 2018 at 11:57 AM, Amit Jain <[hidden email]> wrote:
>>
>> Hi,
>>
>> We are using Flink to process click stream data from Kafka and pushing
>> the same in 128MB file in S3.
>>
>> What is the message processing guarantees with S3 sink? In my
>> understanding, S3A client buffers the data on memory/disk. In failure
>> scenario on particular node, TM would not trigger Writer#close hence
>> buffered data can lose entirely assuming this buffer contains data of
>> last successful checkpointing.
>>
>> --
>> Thanks,
>> Amit
>
>