FlieSystem Connector's Success File Was Submitted Late

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlieSystem Connector's Success File Was Submitted Late

forideal
Hi My friends:
    I use FlieSystem in Flink SQL, and I found that my success file was submitted late, probably dozens of minutes late.
    Here I provide some information:
    1.Flink version is 1.11.1.
    2.Source DDL
       create table test (
          `timestamp bigint`,
     event_time as _timestamp(timestamp),
WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE
)...
3.Sink DDL
create table sinkTest(
xxx
dtm VARCHAR,
hh VARCHAR
) PARTITIONED BY (dtm, hh)
with(
'connector' = 'filesystem',
'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 'sink.rolling-policy.file-size' = '512MB', 'sink.rolling-policy.check-interval' = '10 min', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.file-suffix' = '.parquet',
'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
)

4.The interval for task submission checkpoint is 5 minutes, and the checkpoints are all successful.

I think that if my task is not delayed, then our success file will be submitted in about 10 minutes every hour, but the fact is that it is submitted very late.
Here are some source codes about submitting success file. When the watermark is greater than the current partition time + delay time, I can submit the success file.
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
Best,
Forideal


 

Reply | Threaded
Open this post in threaded view
|

Re:FlieSystem Connector's Success File Was Submitted Late

forideal
I found the reason:

   Late data processing: The record will be written into its partition when a record is supposed to be written into a partition that has already been committed, and then the committing of this partition will be triggered again.
So, I see that the success file is slower to update the file.

Best,
Forideal


At 2021-05-07 19:41:45, "forideal" <[hidden email]> wrote:

Hi My friends:
    I use FlieSystem in Flink SQL, and I found that my success file was submitted late, probably dozens of minutes late.
    Here I provide some information:
    1.Flink version is 1.11.1.
    2.Source DDL
       create table test (
          `timestamp bigint`,
     event_time as _timestamp(timestamp),
WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE
)...
3.Sink DDL
create table sinkTest(
xxx
dtm VARCHAR,
hh VARCHAR
) PARTITIONED BY (dtm, hh)
with(
'connector' = 'filesystem',
'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 'sink.rolling-policy.file-size' = '512MB', 'sink.rolling-policy.check-interval' = '10 min', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.file-suffix' = '.parquet',
'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
)

4.The interval for task submission checkpoint is 5 minutes, and the checkpoints are all successful.

I think that if my task is not delayed, then our success file will be submitted in about 10 minutes every hour, but the fact is that it is submitted very late.
Here are some source codes about submitting success file. When the watermark is greater than the current partition time + delay time, I can submit the success file.
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
Best,
Forideal


 



 

Reply | Threaded
Open this post in threaded view
|

Re: FlieSystem Connector's Success File Was Submitted Late

Leonard Xu
Hi, forideal

I also encountered this problem and opened an issue[1], you can have a look.

Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-22472



在 2021年5月7日,20:31,forideal <[hidden email]> 写道:

I found the reason:

   Late data processing: The record will be written into its partition when a record is supposed to be written into a partition that has already been committed, and then the committing of this partition will be triggered again.
So, I see that the success file is slower to update the file.

Best,
Forideal

At 2021-05-07 19:41:45, "forideal" <[hidden email]> wrote:

Hi My friends:
    I use FlieSystem in Flink SQL, and I found that my success file was submitted late, probably dozens of minutes late.
    Here I provide some information:
    1.Flink version is 1.11.1.
    2.Source DDL
       create table test (
          `timestamp bigint`,
     event_time as _timestamp(timestamp),
WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE
)...
3.Sink DDL
create table sinkTest(
xxx
dtm VARCHAR,
hh VARCHAR
) PARTITIONED BY (dtm, hh)
with(
'connector' = 'filesystem',
'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 'sink.rolling-policy.file-size' = '512MB', 'sink.rolling-policy.check-interval' = '10 min', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.file-suffix' = '.parquet',
'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
)

4.The interval for task submission checkpoint is 5 minutes, and the checkpoints are all successful.

I think that if my task is not delayed, then our success file will be submitted in about 10 minutes every hour, but the fact is that it is submitted very late.
Here are some source codes about submitting success file. When the watermark is greater than the current partition time + delay time, I can submit the success file.
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
Best,
Forideal