Flink StreamingFileSink part file behavior

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink StreamingFileSink part file behavior

amran dean
Hello,
I am using StreamingFileSink, KafkaConsumer010 as a Kafka -> S3 connector (Flink 1.8.1, Kafka 0.10.1). 

The setup is simple:
Data is written first bucketed by datetime (granularity of 1 day), then by kafka partition.
I am using event time (Kafka timestamp, recorded at the time of creation at the producer) to do the bucketing.

E.g:
s3://some_topic/dt=2019-10-21/partition_0/part-0-0

Suppose a Kafka record timestamped 10/23 23:57 was written out-of-order (relative to its timestamp) due to network delay. In the Kafka partition, it is next to messages timestamped 10/24 00:01

Will Flink correctly write this record to bucket:
dt=2019-10-23

Or will it be written to bucket 2019-10-24? To phrase this question a bit differently, Does flink know when to "close" bucket dt=2019-10-23, and move on to the next datetime bucket? Or is Flink able to write to arbitrary datetime buckets as messages are read out of order with respect to their Kafka timestamps?

 What is the delay was even longer, say 4 hours?
Reply | Threaded
Open this post in threaded view
|

Re: Flink StreamingFileSink part file behavior

Paul Lam
Hi,

StreamingFileSink can write to many buckets at the same time, and it uses BucketAssigner to determine the Bucket for each record.

WRT you questions, the records would be written to the expected bucket even if they arrive out of order.

You can refer to [1] for more information.


Best,
Paul Lam

在 2019年10月24日,04:39,amran dean <[hidden email]> 写道:

Hello,
I am using StreamingFileSink, KafkaConsumer010 as a Kafka -> S3 connector (Flink 1.8.1, Kafka 0.10.1). 

The setup is simple:
Data is written first bucketed by datetime (granularity of 1 day), then by kafka partition.
I am using event time (Kafka timestamp, recorded at the time of creation at the producer) to do the bucketing.

E.g:
<a href="s3://some_topic/dt=2019-10-21/partition_0/part-0-0" class="">s3://some_topic/dt=2019-10-21/partition_0/part-0-0

Suppose a Kafka record timestamped 10/23 23:57 was written out-of-order (relative to its timestamp) due to network delay. In the Kafka partition, it is next to messages timestamped 10/24 00:01

Will Flink correctly write this record to bucket:
dt=2019-10-23

Or will it be written to bucket 2019-10-24? To phrase this question a bit differently, Does flink know when to "close" bucket dt=2019-10-23, and move on to the next datetime bucket? Or is Flink able to write to arbitrary datetime buckets as messages are read out of order with respect to their Kafka timestamps?

 What is the delay was even longer, say 4 hours?