Hi guys, I'm retrying to send some app related custom metrics from Flink to Datadog via StatsD.
// flink-conf.yaml
metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
metrics.scope.jm.job: bowen.jobmanager.bowen.job_name
metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name>
metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
metrics.scope.operator: bowen.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
// test code, by modifying WordCount example
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
private Counter counter;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.addGroup("bowen.test")
.gauge("bowen.test.flink", new Gauge<Integer>() {
@Override
public Integer getValue() {
return 100;
}
}); // test custom metrics
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("bowen.test")
.counter("bowen.flink.metric"); // test custom metrics
counter.inc(100);
}
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
counter.inc(100);
}
}
I found my Datadog received all system scope metrics, but non of my custom metric. I researched all night but gained no progress. What did I do wrong? Flink is able to handle custom metrics right? I'd really appreciate some guidance on sending custom metrics!
Thank you very much!
Bowen