Flink does not read from some Kafka Partitions

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink does not read from some Kafka Partitions

Ruby Andrews
Hello,

My team ran into some behavior we did not expect when we tried to get an existing Flink app to read from a re-sized Kafka. Here are the highlights: 
- We are using the FlinkKafkaConsumer010.
- We re-partitioned (added partitions to) an existing topic that our Flink app reads so that it the topic has 8 partitions. Following that, we re-deployed our task managers. We thought that the task managers would start reading new partitions.
- 8 task managers read from the topic, but they did NOT read all of the partitions. 3 of the partitions had 2 task managers reading from them and 3 of the partitions had 0 task managers reading from them. My team had expected that Flink would automatically read from all partitions, 1 task manager per partition.
- To force the app to read from all partitions, we added this property to our kafka consumer properties: flink.partition-discovery.interval-millis and re-deployed the task managers. We expected this flag to cause Flink to discover (and start reading) all partitions. 
- We did not see a change in the Kafka readers — there were still 3 topics not being read.
- Finally, we changed the ID of the Flink operator that  reads the Kafka topic and re-deployed the task managers again. 
- After changing the ID, the app started reading from all partitions. 

 
What is the correct way to pick up partitions after re-partitioning a Kafka topic? 

Thanks,
Ruby  
Reply | Threaded
Open this post in threaded view
|

Re: Flink does not read from some Kafka Partitions

Timo Walther
Hi Ruby,

which Flink version are you using? When looking into the code of the org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase you can see that the behavior for using partition discovery or not depends on the Flink version.

Regards,
Timo


Am 15.05.18 um 02:01 schrieb Ruby Andrews:
Hello,

My team ran into some behavior we did not expect when we tried to get an existing Flink app to read from a re-sized Kafka. Here are the highlights: 
- We are using the FlinkKafkaConsumer010.
- We re-partitioned (added partitions to) an existing topic that our Flink app reads so that it the topic has 8 partitions. Following that, we re-deployed our task managers. We thought that the task managers would start reading new partitions.
- 8 task managers read from the topic, but they did NOT read all of the partitions. 3 of the partitions had 2 task managers reading from them and 3 of the partitions had 0 task managers reading from them. My team had expected that Flink would automatically read from all partitions, 1 task manager per partition.
- To force the app to read from all partitions, we added this property to our kafka consumer properties: flink.partition-discovery.interval-millis and re-deployed the task managers. We expected this flag to cause Flink to discover (and start reading) all partitions. 
- We did not see a change in the Kafka readers — there were still 3 topics not being read.
- Finally, we changed the ID of the Flink operator that  reads the Kafka topic and re-deployed the task managers again. 
- After changing the ID, the app started reading from all partitions. 

 
What is the correct way to pick up partitions after re-partitioning a Kafka topic? 

Thanks,
Ruby  


Reply | Threaded
Open this post in threaded view
|

Re: Flink does not read from some Kafka Partitions

Tzu-Li (Gordon) Tai
Hi,

Timo is correct - partition discovery is supported by the consumer only
starting from Flink 1.4.

The expected behaviour without partition discovery on, is that the list of
partitions picked up on the first execution of the job will be the list of
subscribed partition across all executions.
When restoring from a savepoint / checkpoint, discovery for new partitions
will not occur.
The reason why new partitions are discovered after you changed the UID of
the consumer operator to a new one, is because the consumer is considered a
completely new operator without any restored state.

Since Flink 1.4, you can choose to enable partition discovery by setting
flink.partition-discovery.interval-millis.
This can be turned on / off at the start of any execution attempt.
For example, you can have it off initially, take a savepoint, and when
restoring change that configuration to enable discovery.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink does not read from some Kafka Partitions

Ruby Andrews
Thank you both for your responses. Looks like I may have inadvertently used 1.3.1 libraries instead of 1.4. 

Ruby

On Wed, May 16, 2018 at 3:12 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Timo is correct - partition discovery is supported by the consumer only
starting from Flink 1.4.

The expected behaviour without partition discovery on, is that the list of
partitions picked up on the first execution of the job will be the list of
subscribed partition across all executions.
When restoring from a savepoint / checkpoint, discovery for new partitions
will not occur.
The reason why new partitions are discovered after you changed the UID of
the consumer operator to a new one, is because the consumer is considered a
completely new operator without any restored state.

Since Flink 1.4, you can choose to enable partition discovery by setting
flink.partition-discovery.interval-millis.
This can be turned on / off at the start of any execution attempt.
For example, you can have it off initially, take a savepoint, and when
restoring change that configuration to enable discovery.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/