FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka

Posted by Rob on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkKafkaConsumer010-does-not-start-from-the-next-record-on-startup-from-offsets-in-Kafka-tp16846.html

Hello
according to https://issues.apache.org/jira/browse/FLINK-4618 "FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka".
Is the same behavior expected of FlinkKafkaConsumer010?
A document in Kafka is failing my job and I want on restart of the job (via the restart strategy or after stop and run again the job) processing to continue from the next document in the partition.
Checkpoints are enabled:

            env.enableCheckpointing(1000);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoints to filesystem "file:/tmp/checkpoints")
taskmanager_4  | 2017-11-21 17:31:42,873 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting restore state in the FlinkKafkaConsumer.
taskmanager_4  | 2017-11-21 17:31:42,875 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 4 will commit offsets back to Kafka on completed checkpoints

Also, if a TM (other than the one that fails) has managed to successfully complete reading and processing a record from Kafka, after the job is cancelled and restarted, the already complete record is retrieved and processed again together with the failing one

flink-1.3.2
kafka_2.12-0.11.0.1

Thanks!
- Robert