Customized Metric Reporter can not be found by Flink

Posted by Fan Xie on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Customized-Metric-Reporter-can-not-be-found-by-Flink-tp43656.html

Hi Flink Community,

Recently I implemented a customized metric reporter (named: DiagnosticsMessageReporter) to report Flink metrics to a Kafka topic. I built this reporter into a jar file and copy it to /opt/flink/plugins/DiagnosticsMessageReporter/DiagnosticsMessageReporter.jar for both the Job Manager and task manager's containers. But later on I found the following logs indicated that the metric reporter can not be loaded:

2021-05-11 23:08:31,523 WARN  org.apache.flink.runtime.metrics.ReporterSetup               [] - The reporter factory (org.apache.flink.metrics.reporter.DiagnosticsMessageReporterFactory) could not be found for reporter DiagnosticsMessageReporter. Available factories: [org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, org.apache.flink.metrics.graphite.GraphiteReporterFactory, org.apache.flink.metrics.statsd.StatsDReporterFactory, org.apache.flink.metrics.prometheus.PrometheusReporterFactory, org.apache.flink.metrics.jmx.JMXReporterFactory, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
2021-05-11 23:21:55,698 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl          [] - No metrics reporter configured, no metrics will be exposed/reported.

The Flink configs I used are as following:
#DiagnosticsMessageReporter configs
metrics.reporters: DiagnosticsMessageReporter
metrics.reporter.DiagnosticsMessageReporter.factory.class: org.apache.flink.metrics.reporter.DiagnosticsMessageReporterFactory
metrics.reporter.DiagnosticsMessageReporter.bootstrap.servers: kafka:9092
metrics.reporter.DiagnosticsMessageReporter.topic: flink-metrics
metrics.reporter.DiagnosticsMessageReporter.keyBy: task_attempt_id
metrics.reporter.DiagnosticsMessageReporter.interval: 1 SECONDS
Does anyone have any idea about what happened here? Am I missing some of the steps to load the customized reporter as a plugin? Really appreciate if someone can help to take a look at this! 

Best,
Fan