Hey guys, We are trying to add external monitoring to our system, but we can only get the lag in kafka topics while the Flink job is running -- if, for some reason, the Flink job fails, we get no visibility on how big the lag is. (Besides that, the way Flink reports is not accurate and produces a lot of -Inf, which I already discussed before.) While looking at the problem, we noticed that the FlinkKafkaConsumer never uses `subscribe` to subscribe to the topics and that's why the values are never stored back into Kafka, even when the driver itself does `commitAsync`. Is there any reason for not subscribing to topics that I may have missed? -- Julio Biason, Sofware Engineer AZION | Deliver. Accelerate. Protect. Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554 |
Hi, Julio: 1. Flink doesn't use subscribe because it needs to control partition assignment itself, which is important for implementing exactly once. 2. Can you share the versions you are using, including kafka, kafka client, flink? We are also use flink kafka consumer and we can monitor it correctly. On Tue, Sep 4, 2018 at 3:09 AM Julio Biason <[hidden email]> wrote:
Liu, Renjie Software Engineer, MVAD |
Hi Renjie, 1. For what I could grasp from Kafka docs, you can subscribe and still use poll() to capture a specific offset. But I just read the starting point of it and didn't go deep into it. 2. Currently, Flink 1.4.2, Kafka 0.10.1 and the FlinkKafkaConsumer010. On Tue, Sep 4, 2018 at 12:47 AM, Renjie Liu <[hidden email]> wrote:
-- Julio Biason, Sofware Engineer AZION | Deliver. Accelerate. Protect. Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554 |
Hi Julio, As Renjie had already mentioned, to achieve exactly-once semantics with the Kafka consumer, Flink needs to have control over the Kafka partition to source subtask assignment. To add a bit more detail here, this is due to the fact that each subtask writes to Flink managed state the current offsets of partitions that it is assigned, and that is coordinated with Fink’s checkpoints. If it were to use Kafka’s automatic consumer group assignments (i.e. when using the subscribe API), the consumer would have no control over when exactly partition subscriptions are reassigned across subtasks. If I understood correctly, what you were suggesting in your last reply was to simply use poll() to query the offset in the case that some partition was reassigned to another source subtask. This is problematic because there is no consistency guarantees between the committed offsets in Kafka and Fink’s checkpoints. Committing of offsets are and should only be used as a means to expose consumer progress to the outside world beyond the Flink job. Cheers, Gordon
On 4 September 2018 at 2:25:38 PM, Julio Biason ([hidden email]) wrote:
|
Hi, Julio: Is checkpoint enabled in your job? Flink kafka connector only commits offsets when checkpoint is enabled. On Tue, Sep 4, 2018 at 11:43 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Liu, Renjie Software Engineer, MVAD |
Free forum by Nabble | Edit this page |