Hello,
My team ran into some behavior we did not expect when we tried to get an existing Flink app to read from a re-sized Kafka. Here are the highlights: - We are using the FlinkKafkaConsumer010. - We re-partitioned (added partitions to) an existing topic that our Flink app reads so that it the topic has 8 partitions. Following that, we re-deployed our task managers. We thought that the task managers would start reading new partitions. - 8 task managers read from the topic, but they did NOT read all of the partitions. 3 of the partitions had 2 task managers reading from them and 3 of the partitions had 0 task managers reading from them. My team had expected that Flink would automatically read from all partitions, 1 task manager per partition. - To force the app to read from all partitions, we added this property to our kafka consumer properties: flink.partition-discovery.interval-millis and re-deployed the task managers. We expected this flag to cause Flink to discover (and start reading) all partitions. - We did not see a change in the Kafka readers — there were still 3 topics not being read. - Finally, we changed the ID of the Flink operator that reads the Kafka topic and re-deployed the task managers again. - After changing the ID, the app started reading from all partitions. What is the correct way to pick up partitions after re-partitioning a Kafka topic? Thanks, Ruby
|
Hi Ruby,
which Flink version are you using? When looking into the code of the org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase you can see that the behavior for using partition discovery or not depends on the Flink version. Regards, Timo Am 15.05.18 um 02:01 schrieb Ruby Andrews: Hello,
|
Hi,
Timo is correct - partition discovery is supported by the consumer only starting from Flink 1.4. The expected behaviour without partition discovery on, is that the list of partitions picked up on the first execution of the job will be the list of subscribed partition across all executions. When restoring from a savepoint / checkpoint, discovery for new partitions will not occur. The reason why new partitions are discovered after you changed the UID of the consumer operator to a new one, is because the consumer is considered a completely new operator without any restored state. Since Flink 1.4, you can choose to enable partition discovery by setting flink.partition-discovery.interval-millis. This can be turned on / off at the start of any execution attempt. For example, you can have it off initially, take a savepoint, and when restoring change that configuration to enable discovery. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thank you both for your responses. Looks like I may have inadvertently used 1.3.1 libraries instead of 1.4. Ruby On Wed, May 16, 2018 at 3:12 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi, |
Free forum by Nabble | Edit this page |