Hello
according to https://issues.apache.org/jira/browse/FLINK-4618 "FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka". Is the same behavior expected of FlinkKafkaConsumer010? A document in Kafka is failing my job and I want on restart of the job (via the restart strategy or after stop and run again the job) processing to continue from the next document in the partition. Checkpoints are enabled: env.enableCheckpointing(1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoints to filesystem "file:/tmp/checkpoints") taskmanager_4 | 2017-11-21 17:31:42,873 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting restore state in the FlinkKafkaConsumer. taskmanager_4 | 2017-11-21 17:31:42,875 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 4 will commit offsets back to Kafka on completed checkpoints Also, if a TM (other than the one that fails) has managed to successfully complete reading and processing a record from Kafka, after the job is cancelled and restarted, the already complete record is retrieved and processed again together with the failing one flink-1.3.2 kafka_2.12-0.11.0.1 Thanks! - Robert |
Hi Robert,
As expected with exactly-once guarantees, a record that caused a Flink job to fail will be attempted to be reprocessed on the restart of the job. For some specific "corrupt" record that causes the job to fall into a fail-and-restart loop, there is a way to let the Kafka consumer skip that specific "corrupt" record. To do that, return null when attempting to deserialize the corrupted record (specifically, that would be the `deserialize` method on the provided `DeserializationSchema`). Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks Gordon
But what if there is an uncaught exception in processing of the record (during normal job execution, after deserialization)? After the restart strategy exceeds the failure rate, the job will fail and on re-run it would start at the same offset, right? Is there a way to avoid this and 'automatically' start at offset+1? Or maybe the recipe is to manually retrieve the record at partitionX/offsetY for the group and then restart? Best regards -Robert >-------- Оригинално писмо -------- >От: "Tzu-Li (Gordon) Tai" [hidden email] >Относно: Re: FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka >До: [hidden email] >Изпратено на: 22.11.2017 14:57 > Hi Robert, > > As expected with exactly-once guarantees, a record that caused a Flink job > to fail will be attempted to be reprocessed on the restart of the job. > > For some specific "corrupt" record that causes the job to fall into a > fail-and-restart loop, there is a way to let the Kafka consumer skip that > specific "corrupt" record. To do that, return null when attempting to > deserialize the corrupted record (specifically, that would be the > `deserialize` method on the provided `DeserializationSchema`). > > Cheers, > Gordon > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Robert,
Uncaught exceptions that cause the job to fall into a fail-and-restart loop is likewise to the corrupt record case I mentioned. With exactly-once guarantees, the job will roll back to the last complete checkpoint, which "resets" the Flink consumer to some earlier Kafka partition offset. Eventually, that failing record will be processed again. Currently there is no way to manipulate the "reset" offset on restore from failure. That is strictly reset to the offset stored in the last complete checkpoint, otherwise exactly-once is violated. Rob wrote > Or maybe the recipe is to manually retrieve the record at > partitionX/offsetY for the group and then restart? This would not work, as exactly-once is achieved with the offsets that Flink stores in its checkpoints, not the offsets that are committed back to Kafka. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Gordon, thanks for clarifying this!
For my experimental project I decided to disable checkpointing and use kafkaConsumer.setStartFromGroupOffsets() (explicitly, although docs state it is the default). I verified with kafka-consumer-offset-checker.sh that, after the job fails and is restarted, it will skip the offsets it was at the time of failure and continue with the newer kafka records. This suits me as another process will go after the left-behind records and try to recover what it can Best regards -Robert >-------- Оригинално писмо -------- >От: "Tzu-Li (Gordon) Tai" [hidden email] >Относно: Re: FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka >До: [hidden email] >Изпратено на: 23.11.2017 09:01 > Hi Robert, > > Uncaught exceptions that cause the job to fall into a fail-and-restart loop > is likewise to the corrupt record case I mentioned. > > With exactly-once guarantees, the job will roll back to the last complete > checkpoint, which "resets" the Flink consumer to some earlier Kafka > partition offset. Eventually, that failing record will be processed again. > Currently there is no way to manipulate the "reset" offset on restore from > failure. That is strictly reset to the offset stored in the last complete > checkpoint, otherwise exactly-once is violated. > > > Rob wrote > > Or maybe the recipe is to manually retrieve the record at > > partitionX/offsetY for the group and then restart? > > This would not work, as exactly-once is achieved with the offsets that Flink > stores in its checkpoints, not the offsets that are committed back to Kafka. > > Cheers, > Gordon > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |