Flink 1.9.1 KafkaConnector missing data (1M+ records)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.9.1 KafkaConnector missing data (1M+ records)

Harrison Xu
Hello,

We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 0.10.1.1) arbitrarily skipping data. 

Context
KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate operators. Recently, we noticed that millions of Kafka records were missing for one topic partition (this job is running for 100+ topic partitions, and such behavior was only observed for one). This job is run on YARN, and hosts were healthy with no hardware faults observed. No exceptions in jobmanager or taskmanager logs at this time.

How was this detected?
As a sanity check, we dual-write Kafka metadata (offsets) to a separate location in S3, and have monitoring to ensure that written offsets are contiguous with no duplicates.
Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3. 

(Condensed) Taskmanager logs
2019-11-24 02:36:50,140 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with MPU ID 3XG...
2019-11-24 02:41:27,966 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with MPU ID 9MW...
2019-11-24 02:46:29,153 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with MPU ID 7AP...
2019-11-24 02:51:32,602 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with MPU ID xQU...
2019-11-24 02:56:35,183 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with MPU ID pDL...
2019-11-24 03:01:26,059 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with MPU ID Itf...
2019-11-24 03:01:26,510 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with MPU ID e3l...
2019-11-24 03:06:26,230 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with MPU ID 5z4...
2019-11-24 03:11:22,711 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with MPU ID NfP...

Two observations stand out from the above logs:
- Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in millions of missing offsets. They are never written in future commits (and data in S3 shows this).
- Two commits for the same topic partition ("digest_features", partition 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit interval being set at 5 minutes. Why was the same TopicPartition read from and committed twice in such a short interval?

Would greatly appreciate if anyone is able to shed light on this issue. Happy to provide full logs if needed.
Thanks








Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

Kostas Kloudas-2
Hi Harrison,

One thing to keep in mind is that Flink will only write files if there
is data to write. If, for example, your partition is not active for a
period of time, then no files will be written.
Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
are entirely skipped?

In addition, for the "duplicates", it would help if you could share a
bit more information about your BucketAssigner.
How are these names assigned to the files and what does TT stand for?
Can it be that there are a lot of events for partition 4 that fill up
2 part files for that duration? I am
asking because the counter of the 2 part files differ.

Cheers,
Kostas

On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <[hidden email]> wrote:

>
> Hello,
>
> We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 0.10.1.1) arbitrarily skipping data.
>
> Context
> KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate operators. Recently, we noticed that millions of Kafka records were missing for one topic partition (this job is running for 100+ topic partitions, and such behavior was only observed for one). This job is run on YARN, and hosts were healthy with no hardware faults observed. No exceptions in jobmanager or taskmanager logs at this time.
>
> How was this detected?
> As a sanity check, we dual-write Kafka metadata (offsets) to a separate location in S3, and have monitoring to ensure that written offsets are contiguous with no duplicates.
> Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>
> (Condensed) Taskmanager logs
> 2019-11-24 02:36:50,140 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with MPU ID 3XG...
> 2019-11-24 02:41:27,966 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with MPU ID 9MW...
> 2019-11-24 02:46:29,153 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with MPU ID 7AP...
> 2019-11-24 02:51:32,602 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with MPU ID xQU...
> 2019-11-24 02:56:35,183 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with MPU ID pDL...
> 2019-11-24 03:01:26,059 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with MPU ID Itf...
> 2019-11-24 03:01:26,510 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with MPU ID e3l...
> 2019-11-24 03:06:26,230 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with MPU ID 5z4...
> 2019-11-24 03:11:22,711 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with MPU ID NfP...
>
> Two observations stand out from the above logs:
> - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in millions of missing offsets. They are never written in future commits (and data in S3 shows this).
> - Two commits for the same topic partition ("digest_features", partition 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit interval being set at 5 minutes. Why was the same TopicPartition read from and committed twice in such a short interval?
>
> Would greatly appreciate if anyone is able to shed light on this issue. Happy to provide full logs if needed.
> Thanks
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

Harrison Xu
Thank you for your reply,

Some clarification:

We have configured the BucketAssigner to use the Kafka record timestamp. Exact bucketing behavior as follows:
private static final DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH");

@Override
public String getBucketId(KafkaRecord record, BucketAssigner.Context context) {
return String.format(
"%s/dt=%s/partition_%s",
record.getTopic(),
Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format(formatter),
record.getPartition());
}

For each record, we write only its offset to the S3 object as a sanity check. It is easy to detect missing or duplicate offsets. To answer your questions:

Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
are entirely skipped?
No, because even if the producer were idle during these datetimes, we would expect no missing offsets. We observed both millions of missing records, in addition to missing partitions (2019-11-24T01 and 2019-11-24T02). Further, the producer was very active during this time.
I want to emphasize that we noticed that the consumer for this exact TopicPartition was falling behind (>1 hour lag); this degree of lag was only observed for this partition. (The consumer eventually caught up). It's normal for the consumer to fall behind the producer for short bursts, but we definitely do not expect missing records as a result. There were millions of records whose timestamps fall into (dt 2019-11-24T01 and 2019-11-24T02) - they were entirely skipped by the writer. 


what does TT stand for?
It's simply convention for datetime serialization as string.


Can it be that there are a lot of events for partition 4 that fill up
2 part files for that duration?
We are using the BulkWriter. I am under the impression that this writer should only produce one file per checkpoint interval, which we have configured to be 5 minutes. You see that the preceding commits follow this pattern of one commit per checkpoint interval, which is what we expect. It's very strange that two files for the same TopicPartition (same TaskManager) are committed. 


I am eager to hear your reply and understand what we're seeing.

Thanks,
Harrison

On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas <[hidden email]> wrote:
Hi Harrison,

One thing to keep in mind is that Flink will only write files if there
is data to write. If, for example, your partition is not active for a
period of time, then no files will be written.
Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
are entirely skipped?

In addition, for the "duplicates", it would help if you could share a
bit more information about your BucketAssigner.
How are these names assigned to the files and what does TT stand for?
Can it be that there are a lot of events for partition 4 that fill up
2 part files for that duration? I am
asking because the counter of the 2 part files differ.

Cheers,
Kostas

On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <[hidden email]> wrote:
>
> Hello,
>
> We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 0.10.1.1) arbitrarily skipping data.
>
> Context
> KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate operators. Recently, we noticed that millions of Kafka records were missing for one topic partition (this job is running for 100+ topic partitions, and such behavior was only observed for one). This job is run on YARN, and hosts were healthy with no hardware faults observed. No exceptions in jobmanager or taskmanager logs at this time.
>
> How was this detected?
> As a sanity check, we dual-write Kafka metadata (offsets) to a separate location in S3, and have monitoring to ensure that written offsets are contiguous with no duplicates.
> Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>
> (Condensed) Taskmanager logs
> 2019-11-24 02:36:50,140 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with MPU ID 3XG...
> 2019-11-24 02:41:27,966 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with MPU ID 9MW...
> 2019-11-24 02:46:29,153 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with MPU ID 7AP...
> 2019-11-24 02:51:32,602 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with MPU ID xQU...
> 2019-11-24 02:56:35,183 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with MPU ID pDL...
> 2019-11-24 03:01:26,059 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with MPU ID Itf...
> 2019-11-24 03:01:26,510 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with MPU ID e3l...
> 2019-11-24 03:06:26,230 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with MPU ID 5z4...
> 2019-11-24 03:11:22,711 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with MPU ID NfP...
>
> Two observations stand out from the above logs:
> - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in millions of missing offsets. They are never written in future commits (and data in S3 shows this).
> - Two commits for the same topic partition ("digest_features", partition 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit interval being set at 5 minutes. Why was the same TopicPartition read from and committed twice in such a short interval?
>
> Would greatly appreciate if anyone is able to shed light on this issue. Happy to provide full logs if needed.
> Thanks
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

Kostas Kloudas-2
Hi Harrison,

Really sorry for the late reply.
Do you have any insight on whether the missing records were read by
the consumer and just the StreamingFileSink failed to write their
offsets, or the Kafka consumer did not even read them or dropped them
for some reason? I asking this in order to narrow down the problem. In
addition, did you see anything out of the ordinary in the logs?

I am also cc'ing Becket who may know a bit more on the kafka consumer
side of things.

Cheers,
Kostas

On Mon, Dec 2, 2019 at 10:00 PM Harrison Xu <[hidden email]> wrote:

>
> Thank you for your reply,
>
> Some clarification:
>
> We have configured the BucketAssigner to use the Kafka record timestamp. Exact bucketing behavior as follows:
> private static final DateTimeFormatter formatter = DateTimeFormatter
> .ofPattern("yyyy-MM-dd'T'HH");
>
> @Override
> public String getBucketId(KafkaRecord record, BucketAssigner.Context context) {
> return String.format(
> "%s/dt=%s/partition_%s",
> record.getTopic(),
> Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format(formatter),
> record.getPartition());
> }
>
> For each record, we write only its offset to the S3 object as a sanity check. It is easy to detect missing or duplicate offsets. To answer your questions:
>
> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
> are entirely skipped?
> No, because even if the producer were idle during these datetimes, we would expect no missing offsets. We observed both millions of missing records, in addition to missing partitions (2019-11-24T01 and 2019-11-24T02). Further, the producer was very active during this time.
> I want to emphasize that we noticed that the consumer for this exact TopicPartition was falling behind (>1 hour lag); this degree of lag was only observed for this partition. (The consumer eventually caught up). It's normal for the consumer to fall behind the producer for short bursts, but we definitely do not expect missing records as a result. There were millions of records whose timestamps fall into (dt 2019-11-24T01 and 2019-11-24T02) - they were entirely skipped by the writer.
>
>
> what does TT stand for?
> It's simply convention for datetime serialization as string.
>
>
> Can it be that there are a lot of events for partition 4 that fill up
> 2 part files for that duration?
> We are using the BulkWriter. I am under the impression that this writer should only produce one file per checkpoint interval, which we have configured to be 5 minutes. You see that the preceding commits follow this pattern of one commit per checkpoint interval, which is what we expect. It's very strange that two files for the same TopicPartition (same TaskManager) are committed.
>
>
> I am eager to hear your reply and understand what we're seeing.
>
> Thanks,
> Harrison
>
> On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi Harrison,
>>
>> One thing to keep in mind is that Flink will only write files if there
>> is data to write. If, for example, your partition is not active for a
>> period of time, then no files will be written.
>> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
>> are entirely skipped?
>>
>> In addition, for the "duplicates", it would help if you could share a
>> bit more information about your BucketAssigner.
>> How are these names assigned to the files and what does TT stand for?
>> Can it be that there are a lot of events for partition 4 that fill up
>> 2 part files for that duration? I am
>> asking because the counter of the 2 part files differ.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <[hidden email]> wrote:
>> >
>> > Hello,
>> >
>> > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 0.10.1.1) arbitrarily skipping data.
>> >
>> > Context
>> > KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate operators. Recently, we noticed that millions of Kafka records were missing for one topic partition (this job is running for 100+ topic partitions, and such behavior was only observed for one). This job is run on YARN, and hosts were healthy with no hardware faults observed. No exceptions in jobmanager or taskmanager logs at this time.
>> >
>> > How was this detected?
>> > As a sanity check, we dual-write Kafka metadata (offsets) to a separate location in S3, and have monitoring to ensure that written offsets are contiguous with no duplicates.
>> > Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>> >
>> > (Condensed) Taskmanager logs
>> > 2019-11-24 02:36:50,140 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with MPU ID 3XG...
>> > 2019-11-24 02:41:27,966 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with MPU ID 9MW...
>> > 2019-11-24 02:46:29,153 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with MPU ID 7AP...
>> > 2019-11-24 02:51:32,602 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with MPU ID xQU...
>> > 2019-11-24 02:56:35,183 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with MPU ID pDL...
>> > 2019-11-24 03:01:26,059 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with MPU ID Itf...
>> > 2019-11-24 03:01:26,510 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with MPU ID e3l...
>> > 2019-11-24 03:06:26,230 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with MPU ID 5z4...
>> > 2019-11-24 03:11:22,711 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with MPU ID NfP...
>> >
>> > Two observations stand out from the above logs:
>> > - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in millions of missing offsets. They are never written in future commits (and data in S3 shows this).
>> > - Two commits for the same topic partition ("digest_features", partition 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit interval being set at 5 minutes. Why was the same TopicPartition read from and committed twice in such a short interval?
>> >
>> > Would greatly appreciate if anyone is able to shed light on this issue. Happy to provide full logs if needed.
>> > Thanks
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >