Hi My friends: I use FlieSystem in Flink SQL, and I found that my success file was submitted late, probably dozens of minutes late. Here I provide some information: 1.Flink version is 1.11.1. 2.Source DDL create table test ( `timestamp bigint`, event_time as _timestamp(timestamp), WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE )... 3.Sink DDL create table sinkTest( xxx dtm VARCHAR, hh VARCHAR
) PARTITIONED BY (dtm, hh) with( 'connector' = 'filesystem', 'format' = 'parquet',
'parquet.compression' = 'SNAPPY',
'sink.rolling-policy.file-size' = '512MB',
'sink.rolling-policy.check-interval' = '10 min',
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.file-suffix' = '.parquet',
'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00' ) 4.The interval for task submission checkpoint is 5 minutes, and the checkpoints are all successful. I think that if my task is not delayed, then our success file will be submitted in about 10 minutes every hour, but the fact is that it is submitted very late. Here are some source codes about submitting success file. When the watermark is greater than the current partition time + delay time, I can submit the success file. public List<String> committablePartitions(long checkpointId) {Best, Forideal
|
I found the reason: Late data processing: The record will be written into its partition when a record is supposed to be written into a partition that has already been committed, and then the committing of this partition will be triggered again. So, I see that the success file is slower to update the file. Best, Forideal At 2021-05-07 19:41:45, "forideal" <[hidden email]> wrote:
|
Hi, forideal Best, Leonard [1] https://issues.apache.org/jira/browse/FLINK-22472
|
Free forum by Nabble | Edit this page |