Hello,
Our team tries to test reactive mode and replace FlinkKafkaConsumer with the new KafkaSource. But we can’t find the KafkaSource metrics list. Does anyone have any idea? In our case, we want to know the Kafka consume delay and consume rate. Thanks, Oscar |
Use below respectively flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - Consumer rate flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer lag flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit latency unsure if reactive mode makes any difference. On Mon, May 24, 2021 at 7:44 PM 陳樺威 <[hidden email]> wrote:
------------------------------ IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email. ------------------------------
|
Hi Ardhani, Thanks for your kindly reply. Our team use your provided metrics before, but the metrics disappear after migrate to new KafkaSource. We initialize KafkaSource in following code. ``` val consumer: KafkaSource[T] = KafkaSource.builder() ``` Oscar Ardhani Narasimha <[hidden email]> 於 2021年5月25日 週二 上午12:08寫道:
|
Hi Oscar,
Thanks for raising this problem! Currently metrics of KafkaConsumer are not registered in Flink as in FlinkKafkaConsumer. A ticket has been created on JIRA, and hopefully we can fix it in next release. https://issues.apache.org/jira/browse/FLINK-22766 -- Best Regards, Qingsheng Ren Email: [hidden email] On May 25, 2021, 2:35 PM +0800, 陳樺威 <[hidden email]>, wrote: Hi Ardhani, val consumer: KafkaSource[T] = KafkaSource.builder() .setProperties(properties) .setTopics(topic) .setValueOnlyDeserializer(deserializer) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .build() env .fromSource(consumer, WatermarkStrategy.noWatermarks(), uid) .setParallelism(math.min(parallelism, env.getParallelism)) .setMaxParallelism(parallelism) .name(uid).uid(uid) .rebalance ``` Ardhani Narasimha <[hidden email]> 於 2021年5月25日 週二 上午12:08寫道:
|
In reply to this post by Ardhani Narasimha Swamy
Looks like when KafkaSource is
used instead of FlinkKafkaConsumer, metrics listed below are not
available. Bug? Work in progress?
Thanks,
Alexey
From: Ardhani Narasimha <[hidden email]>
Sent: Monday, May 24, 2021 9:08 AM To: 陳樺威 <[hidden email]> Cc: user <[hidden email]> Subject: Re: KafkaSource metrics Use below respectively
flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - Consumer rate
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer lag
flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit latency
unsure if reactive mode makes any difference.
On Mon, May 24, 2021 at 7:44 PM 陳樺威 <[hidden email]> wrote:
------------------------------
IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email
by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through
this email.
------------------------------
|
From: Alexey Trenikhun <[hidden email]>
Sent: Tuesday, May 25, 2021 3:25 PM To: Ardhani Narasimha <[hidden email]>; 陳樺威 <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: KafkaSource metrics
Looks like when KafkaSource is used instead of FlinkKafkaConsumer,
metrics listed below are not available. Bug? Work in progress?
Thanks,
Alexey
From: Ardhani Narasimha <[hidden email]>
Sent: Monday, May 24, 2021 9:08 AM To: 陳樺威 <[hidden email]> Cc: user <[hidden email]> Subject: Re: KafkaSource metrics Use below respectively
flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - Consumer rate
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer lag
flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit latency
unsure if reactive mode makes any difference.
On Mon, May 24, 2021 at 7:44 PM 陳樺威 <[hidden email]> wrote:
------------------------------
IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email
by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through
this email.
------------------------------
|
Free forum by Nabble | Edit this page |