Why FlinkKafkaConsumer doesn't subscribe to topics?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Why FlinkKafkaConsumer doesn't subscribe to topics?

Julio Biason
Hey guys,

We are trying to add external monitoring to our system, but we can only get the lag in kafka topics while the Flink job is running -- if, for some reason, the Flink job fails, we get no visibility on how big the lag is.

(Besides that, the way Flink reports is not accurate and produces a lot of -Inf, which I already discussed before.)

While looking at the problem, we noticed that the FlinkKafkaConsumer never uses `subscribe` to subscribe to the topics and that's why the values are never stored back into Kafka, even when the driver itself does `commitAsync`.

Is there any reason for not subscribing to topics that I may have missed?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Why FlinkKafkaConsumer doesn't subscribe to topics?

Renjie Liu
Hi, Julio:
1. Flink doesn't use subscribe because it needs to control partition assignment itself, which is important for implementing exactly once.
2. Can you share the versions you are using, including kafka, kafka client, flink?  We are also use flink kafka consumer and we can monitor it correctly.

On Tue, Sep 4, 2018 at 3:09 AM Julio Biason <[hidden email]> wrote:
Hey guys,

We are trying to add external monitoring to our system, but we can only get the lag in kafka topics while the Flink job is running -- if, for some reason, the Flink job fails, we get no visibility on how big the lag is.

(Besides that, the way Flink reports is not accurate and produces a lot of -Inf, which I already discussed before.)

While looking at the problem, we noticed that the FlinkKafkaConsumer never uses `subscribe` to subscribe to the topics and that's why the values are never stored back into Kafka, even when the driver itself does `commitAsync`.

Is there any reason for not subscribing to topics that I may have missed?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: Why FlinkKafkaConsumer doesn't subscribe to topics?

Julio Biason
Hi Renjie,

1. For what I could grasp from Kafka docs, you can subscribe and still use poll() to capture a specific offset. But I just read the starting point of it and didn't go deep into it.

2. Currently, Flink 1.4.2, Kafka 0.10.1 and the FlinkKafkaConsumer010.

On Tue, Sep 4, 2018 at 12:47 AM, Renjie Liu <[hidden email]> wrote:
Hi, Julio:
1. Flink doesn't use subscribe because it needs to control partition assignment itself, which is important for implementing exactly once.
2. Can you share the versions you are using, including kafka, kafka client, flink?  We are also use flink kafka consumer and we can monitor it correctly.

On Tue, Sep 4, 2018 at 3:09 AM Julio Biason <[hidden email]> wrote:
Hey guys,

We are trying to add external monitoring to our system, but we can only get the lag in kafka topics while the Flink job is running -- if, for some reason, the Flink job fails, we get no visibility on how big the lag is.

(Besides that, the way Flink reports is not accurate and produces a lot of -Inf, which I already discussed before.)

While looking at the problem, we noticed that the FlinkKafkaConsumer never uses `subscribe` to subscribe to the topics and that's why the values are never stored back into Kafka, even when the driver itself does `commitAsync`.

Is there any reason for not subscribing to topics that I may have missed?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
--
Liu, Renjie
Software Engineer, MVAD



--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Why FlinkKafkaConsumer doesn't subscribe to topics?

Tzu-Li (Gordon) Tai
Hi Julio,

As Renjie had already mentioned, to achieve exactly-once semantics with the Kafka consumer, Flink needs to have control over the Kafka partition to source subtask assignment.

To add a bit more detail here, this is due to the fact that each subtask writes to Flink managed state the current offsets of partitions that it is assigned, and that is coordinated with Fink’s checkpoints.
If it were to use Kafka’s automatic consumer group assignments (i.e. when using the subscribe API), the consumer would have no control over when exactly partition subscriptions are reassigned across subtasks.
If I understood correctly, what you were suggesting in your last reply was to simply use poll() to query the offset in the case that some partition was reassigned to another source subtask.
This is problematic because there is no consistency guarantees between the committed offsets in Kafka and Fink’s checkpoints.
Committing of offsets are and should only be used as a means to expose consumer progress to the outside world beyond the Flink job.

Hope this provides a bit more insight.

Cheers,
Gordon

On 4 September 2018 at 2:25:38 PM, Julio Biason ([hidden email]) wrote:

Hi Renjie,

1. For what I could grasp from Kafka docs, you can subscribe and still use poll() to capture a specific offset. But I just read the starting point of it and didn't go deep into it.

2. Currently, Flink 1.4.2, Kafka 0.10.1 and the FlinkKafkaConsumer010.

On Tue, Sep 4, 2018 at 12:47 AM, Renjie Liu <[hidden email]> wrote:
Hi, Julio:
1. Flink doesn't use subscribe because it needs to control partition assignment itself, which is important for implementing exactly once.
2. Can you share the versions you are using, including kafka, kafka client, flink?  We are also use flink kafka consumer and we can monitor it correctly.

On Tue, Sep 4, 2018 at 3:09 AM Julio Biason <[hidden email]> wrote:
Hey guys,

We are trying to add external monitoring to our system, but we can only get the lag in kafka topics while the Flink job is running -- if, for some reason, the Flink job fails, we get no visibility on how big the lag is.

(Besides that, the way Flink reports is not accurate and produces a lot of -Inf, which I already discussed before.)

While looking at the problem, we noticed that the FlinkKafkaConsumer never uses `subscribe` to subscribe to the topics and that's why the values are never stored back into Kafka, even when the driver itself does `commitAsync`.

Is there any reason for not subscribing to topics that I may have missed?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
--
Liu, Renjie
Software Engineer, MVAD



--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Why FlinkKafkaConsumer doesn't subscribe to topics?

Renjie Liu
Hi, Julio:
Is checkpoint enabled in your job? Flink kafka connector only commits offsets when checkpoint is enabled.

On Tue, Sep 4, 2018 at 11:43 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Julio,

As Renjie had already mentioned, to achieve exactly-once semantics with the Kafka consumer, Flink needs to have control over the Kafka partition to source subtask assignment.

To add a bit more detail here, this is due to the fact that each subtask writes to Flink managed state the current offsets of partitions that it is assigned, and that is coordinated with Fink’s checkpoints.
If it were to use Kafka’s automatic consumer group assignments (i.e. when using the subscribe API), the consumer would have no control over when exactly partition subscriptions are reassigned across subtasks.
If I understood correctly, what you were suggesting in your last reply was to simply use poll() to query the offset in the case that some partition was reassigned to another source subtask.
This is problematic because there is no consistency guarantees between the committed offsets in Kafka and Fink’s checkpoints.
Committing of offsets are and should only be used as a means to expose consumer progress to the outside world beyond the Flink job.

Hope this provides a bit more insight.

Cheers,
Gordon


On 4 September 2018 at 2:25:38 PM, Julio Biason ([hidden email]) wrote:

Hi Renjie,

1. For what I could grasp from Kafka docs, you can subscribe and still use poll() to capture a specific offset. But I just read the starting point of it and didn't go deep into it.

2. Currently, Flink 1.4.2, Kafka 0.10.1 and the FlinkKafkaConsumer010.

On Tue, Sep 4, 2018 at 12:47 AM, Renjie Liu <[hidden email]> wrote:
Hi, Julio:
1. Flink doesn't use subscribe because it needs to control partition assignment itself, which is important for implementing exactly once.
2. Can you share the versions you are using, including kafka, kafka client, flink?  We are also use flink kafka consumer and we can monitor it correctly.

On Tue, Sep 4, 2018 at 3:09 AM Julio Biason <[hidden email]> wrote:
Hey guys,

We are trying to add external monitoring to our system, but we can only get the lag in kafka topics while the Flink job is running -- if, for some reason, the Flink job fails, we get no visibility on how big the lag is.

(Besides that, the way Flink reports is not accurate and produces a lot of -Inf, which I already discussed before.)

While looking at the problem, we noticed that the FlinkKafkaConsumer never uses `subscribe` to subscribe to the topics and that's why the values are never stored back into Kafka, even when the driver itself does `commitAsync`.

Is there any reason for not subscribing to topics that I may have missed?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
--
Liu, Renjie
Software Engineer, MVAD



--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
--
Liu, Renjie
Software Engineer, MVAD