In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); But in this way every invoke call will initialize a new metrics and the count will be from zero again. How can i reuse the metric initialized before? Thanks, Lei |
Hi Lei, I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record. Thank you~ Xintong Song
|
Hi Xintong, Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it. Actually my scenario is as follows. The record is MySQL binlog info. I want to monitor the qps by tableName. The tableName is different for every record. Thanks, Lei
|
Seems there's no direct solution. Perhaps i can implement this by initializing a HashMap<String, Meter> with all the possible value of tableName in `open` mehtod and get the corresponding Meter according to tableName in the `invoke` method. Thanks, Lei
|
Ok, I see your problem. And yes, keeping a map of metrics should work. Just for double checking, I assume there's an upper bound of your map keys (table names)? Because if not, an infinitely increasing in-memory map that is not managed by Flink's state might become problematic. Thank you~ Xintong Song
|
Free forum by Nabble | Edit this page |