How to dynamically initialize flink metrics in invoke method and then reuse it?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to dynamically initialize flink metrics in invoke method and then reuse it?

wanglei2@geekplus.com.cn

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei

Reply | Threaded
Open this post in threaded view
|

Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

Xintong Song
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 11:50 AM [hidden email] <[hidden email]> wrote:

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei

Reply | Threaded
Open this post in threaded view
|

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

wanglei2@geekplus.com.cn
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. 
I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The tableName is different for every record. 

Thanks,
Lei



Send Time: 2020-07-03 13:14
Receiver: [hidden email]
Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 11:50 AM [hidden email] <[hidden email]> wrote:

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei

Reply | Threaded
Open this post in threaded view
|

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

wanglei2@geekplus.com.cn

Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap<String, Meter> with all the possible value of tableName  in `open` mehtod and get the corresponding  Meter according to tableName in the `invoke` method. 


Thanks,
Lei 

 
Send Time: 2020-07-03 14:27
Receiver: [hidden email]
Subject: Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. 
I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The tableName is different for every record. 

Thanks,
Lei



Send Time: 2020-07-03 13:14
Receiver: [hidden email]
Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 11:50 AM [hidden email] <[hidden email]> wrote:

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei

Reply | Threaded
Open this post in threaded view
|

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

Xintong Song
Ok, I see your problem. And yes, keeping a map of metrics should work.

Just for double checking, I assume there's an upper bound of your map keys (table names)?
Because if not, an infinitely increasing in-memory map that is not managed by Flink's state might become problematic.

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 2:39 PM [hidden email] <[hidden email]> wrote:

Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap<String, Meter> with all the possible value of tableName  in `open` mehtod and get the corresponding  Meter according to tableName in the `invoke` method. 


Thanks,
Lei 

 
Send Time: 2020-07-03 14:27
Receiver: [hidden email]
Subject: Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. 
I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The tableName is different for every record. 

Thanks,
Lei



Send Time: 2020-07-03 13:14
Receiver: [hidden email]
Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 11:50 AM [hidden email] <[hidden email]> wrote:

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei