Re: Telling if a job has caught up with Kafka

Posted by rmetzger0 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-tp12261p12350.html

Sorry for joining this discussion late, but there is already a metric for the offset lag in our 0.9+ consumers.
Its called the "records-lag-max":  https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring and its exposed via Flink's metrics system.
The only issue is that it only shows the maximum lag across all partitions, not detailed per-partition metrics.

On Mon, Mar 20, 2017 at 3:43 PM, Bruno Aranda <[hidden email]> wrote:
Hi,

Thanks! The proposal sounds very good to us too.

Bruno

On Sun, 19 Mar 2017 at 10:57 Florian König <[hidden email]> wrote:
Thanks Gordon for the detailed explanation! That makes sense and explains the expected behaviour.

The JIRA for the new metric also sounds very good. Can’t wait to have this in the Flink GUI (KafkaOffsetMonitor has some problems and stops working after 1-2 days, don’t know the reason yet).

All the best,
Florian


> Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai <[hidden email]>:
>
> @Florian
> the 0.9 / 0.10 version and 0.8 version behave a bit differently right now for the offset committing.
>
> In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable” etc. settings will be completely ignored and overwritten before used to instantiate the interval Kafka clients, hence committing will only happen on Flink checkpoints.
>
> In 0.8, this isn’t the case. Both automatic periodic committing and committing on checkpoints can take place. That’s perhaps why you’re observing the 0.8 consumer to be committing more frequently.
>
> FYI: This behaviour will be unified in Flink 1.3.0. If you’re interested, you can take a look at https://github.com/apache/flink/pull/3527.
>
> - Gordon
>
>
> On March 17, 2017 at 6:07:38 PM, Florian König ([hidden email]) wrote:
>
>> Why is that so? The checkpoint contains the Kafka offset and would be able to start reading wherever it left off, regardless of any offset stored in Kafka or Zookeeper. Why is the offset not committed regularly, independently from the checkpointing? Or did I misconfigure anything?