FlinkKafkaConsumer subscribes to partitions in restoredState only.

Posted by ninad on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233.html

Hello,
We're noticing that FlinkKafkaConsumer subscribes to partitions in restored state only. Thus, partitions which aren't in restored state, aren't read. We have to restart the job, for FlinkKafkaConsumer to read from all partitions.

Here are the details:

Environment:
Flink-1.3.0, standalone cluster as well as hadoop-cloudera cluster
flink-connector-kafka-0.9_2.11:1.3.0

-Start a job which reads from kafka topic.
-Bring down all kafka brokers
-Bring up kafka brokers

At this point, we see this in the logs:

2017-07-12 19:53:23,661 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading 8 partitions with offsets in restored state: {KafkaTopicPartition{topic='topic.event.filter', partition=0}=3, KafkaTopicPartition{topic='topic.event.filter', partition=2}=18, KafkaTopicPartition{topic='topic.event.filter', partition=8}=1, KafkaTopicPartition{topic='topic.event.filter', partition=9}=-1, KafkaTopicPartition{topic='topic.event.filter', partition=3}=17, KafkaTopicPartition{topic='topic.event.filter', partition=4}=17, KafkaTopicPartition{topic='topic.event.filter', partition=5}=17, KafkaTopicPartition{topic='topic.event.filter', partition=6}=1}

Flink subscribes to only 8 partitions, because they are in recovery. Remaining partitions aren't subscribed to.

From the code, I don't see a place where, the partitions in non-restored state are being subscribed to.

Relevant code:

if (restoredState != null) {
                        for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
                                if (restoredState.containsKey(kafkaTopicPartition)) {
                                        subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
                                }
                        }

                        LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                                getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
                }


We're not setting 'setStartFromEarliest' or 'setStartFromLatest', so it's using the default: 'setStartFromGroupOffsets'.

Are we missing any setting? Doing something wrong? Please let us know. Thanks !