Hi! Have any other devs noticed issues with Flink missing Kafka records with long-running Flink jobs? When I re-run my Flink job and start from the earliest Kafka offset, Flink processes the events correctly. I'm using Flink v1.11.1. I have a simple job that takes records (Requests) from Kafka and serializes them to S3. Pretty basic. No related issues in the text logs. I'm hoping I just have a configuration issue. I'm guessing idleness is working in a way that I'm not expecting. Any ideas? - Dan void createLogRequestJob(StreamExecutionEnvironment env) throws Exception { Properties kafkaSourceProperties = getKafkaSourceProperties("logrequest"); SingleOutputStreamOperator<Request> rawRequestInput = env.addSource( new FlinkKafkaConsumer(getInputRequestTopic(), getProtoDeserializationSchema(Request.class), kafkaSourceProperties)) .uid("source-request") .name("Request") .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1))); executeLogRequest(rawRequestInput); env.execute("log-request"); } void executeLogRequest(SingleOutputStreamOperator<Request> rawRequestInput) { AvroWriterFactory<Request> factory = getAvroWriterFactory(Request.class); rawRequestInput.addSink(StreamingFileSink .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"), factory) .withBucketAssigner(new DateHourBucketAssigner<Request>(request -> request.getTiming().getEventApiTimestamp())) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(createOutputFileConfig()) .build()) .uid("sink-s3-raw-request") .name("S3 Raw Request"); } |
Hi Dan, Can you describe under which conditions you are missing records (after a machine failure, after a Kafka failure, after taking and restoring from a savepoint, ...). Are many records missing? Are "the first records" or the "latest records" missing? Any individual records missing, or larger blocks of data? I don't think that there's a bug in Flink or the Kafka connector. Maybe its just a configuration or systems design issue. On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <[hidden email]> wrote:
|
Hey Robert. Nothing weird. I was trying to find recent records (not the latest). No savepoints (just was running about ~1 day). No checkpoint issues (all successes). I don't know how many are missing. I removed the withIdleness. The other parts are very basic. The text logs look pretty useless. On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger <[hidden email]> wrote:
|
Hi Dan, could you check which records are missing? I'm suspecting it could be records that are emitted right before roll over of the bucket strategy from an otherwise idling partition. If so it could be indeed connected to idleness. Idleness tells Flink to not wait on the particular partition to advance watermark. If a record appears in a previously idle partition with an event timestamp before the watermark of the other partitions, that record would be deemed late and is discarded. On Tue, Apr 27, 2021 at 2:42 AM Dan Hill <[hidden email]> wrote:
|
Hey Arvid, I'll try to repo sometime in the next few weeks. I need to make some larger changes to get a full diff to see what is being dropped. On Thu, Apr 29, 2021 at 4:03 AM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |