Issue with counter metrics for large number of keys

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue with counter metrics for large number of keys

Gaurav Luthra
I want new counter for every key of my windowed stream, And I want the same counter to get increment when the same key comes multiple times in incoming event.

So, I will write below code for every incoming event.
getRuntimeContext().getMetricGroup().counter(myKey).inc();

But above code fails when same value of myKey comes. As this is limitation of flink metrics. It throws exception, "Name collision: Adding a metric with the same name as a metric subgroup: "

Note: myKey is of String type and can have different value for every incoming event.
In my case I expect 100 thousands different values of myKey.

Now, To solve the issue, I have to keep the reference of 100 thousands values of myKey in some data structure e.g.
 Map<String, Counter> myMetricMap; 
 and for every myKey I have to do below.

        Counter counter = myMetricMap.get(myKey);
        if (null == windowMetricGauge)
        {
            Counter counter = new counter();
counter.inc();
            myMetricMap.put(myKey, counter);
            getRuntimeContext().getMetricGroup().counter(myKey,counter);
        }
else
{
counter.inc();
}
Above code suffice my purpose. But I do not want to maintain map of 100 thousands keys values of myKey.

Is there any alternate solution? I am looking for a solution where I achieve above functionality to maintain approx. 100 thousands counter metrics without keeping their reference in map (or any other data structure).


Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206
Reply | Threaded
Open this post in threaded view
|

Re: Issue with counter metrics for large number of keys

Ken Krugler
Hi Gaurav,

I’ve use a few hundred counters before without problems. My concern about > 100K unique counters is that you wind up generating load (and maybe memory issues) for the JobManager.

E.g. with Hadoop’s metric system trying to go much beyond 1000 counters could cause significant problems. IIRC it was due to the JobTracker getting bogged down processing too many counter updates, and/or running out of memory. It’s possible more recent versions of Hadoop no longer have that problem.

But someone on the Flink dev team should weigh in here…

— Ken


On Jan 16, 2019, at 7:45 PM, Gaurav Luthra <[hidden email]> wrote:

Thanks a lot Ken for your inputs.

I will look for your suggested solution and will update about this.
Moreover I want to know, what is the approx number of counter metrics for which I should keep the reference of?
Or what is the max number of references of counter metrics you have heard from anyone using metrics?

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Thu, Jan 17, 2019 at 9:04 AM Ken Krugler <[hidden email]> wrote:
I think trying to use counters to track counts for 100K+ keys is not going to be a good idea.

An alternative is to have a small function with managed MapState, and make that state queryable

Though maybe under the hood that’s what metrics is doing anyway :)

— Ken


On Jan 16, 2019, at 7:25 PM, Gaurav Luthra <[hidden email]> wrote:

I want new counter for every key of my windowed stream, And I want the same counter to get increment when the same key comes multiple times in incoming event.

So, I will write below code for every incoming event.
getRuntimeContext().getMetricGroup().counter(myKey).inc();

But above code fails when same value of myKey comes. As this is limitation of flink metrics. It throws exception, "Name collision: Adding a metric with the same name as a metric subgroup: "

Note: myKey is of String type and can have different value for every incoming event.
In my case I expect 100 thousands different values of myKey.

Now, To solve the issue, I have to keep the reference of 100 thousands values of myKey in some data structure e.g.
 Map<String, Counter> myMetricMap; 
 and for every myKey I have to do below.

        Counter counter = myMetricMap.get(myKey);
        if (null == windowMetricGauge)
        {
            Counter counter = new counter();
counter.inc();
            myMetricMap.put(myKey, counter);
            getRuntimeContext().getMetricGroup().counter(myKey,counter);
        }
else
{
counter.inc();
}
Above code suffice my purpose. But I do not want to maintain map of 100 thousands keys values of myKey.

Is there any alternate solution? I am looking for a solution where I achieve above functionality to maintain approx. 100 thousands counter metrics without keeping their reference in map (or any other data structure).


Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Issue with counter metrics for large number of keys

Gaurav Luthra
Hi Ken,

Thanks for your inputs again.
I will wait for Flink guys to come back to me for the suggestion of implementation of 100 K unique counters.
For time being, I will make the number of counter metric value a configurable parameter in my application. So, user will know what he is trying to do.
And will restrict the maximum value to 1000 so that no mishap happens about memory. and will tune this max value with memories of JobManager and my application.

And try to explore other solutions in flink.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Thu, Jan 17, 2019 at 9:40 AM Ken Krugler <[hidden email]> wrote:
Hi Gaurav,

I’ve use a few hundred counters before without problems. My concern about > 100K unique counters is that you wind up generating load (and maybe memory issues) for the JobManager.

E.g. with Hadoop’s metric system trying to go much beyond 1000 counters could cause significant problems. IIRC it was due to the JobTracker getting bogged down processing too many counter updates, and/or running out of memory. It’s possible more recent versions of Hadoop no longer have that problem.

But someone on the Flink dev team should weigh in here…

— Ken


On Jan 16, 2019, at 7:45 PM, Gaurav Luthra <[hidden email]> wrote:

Thanks a lot Ken for your inputs.

I will look for your suggested solution and will update about this.
Moreover I want to know, what is the approx number of counter metrics for which I should keep the reference of?
Or what is the max number of references of counter metrics you have heard from anyone using metrics?

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Thu, Jan 17, 2019 at 9:04 AM Ken Krugler <[hidden email]> wrote:
I think trying to use counters to track counts for 100K+ keys is not going to be a good idea.

An alternative is to have a small function with managed MapState, and make that state queryable

Though maybe under the hood that’s what metrics is doing anyway :)

— Ken


On Jan 16, 2019, at 7:25 PM, Gaurav Luthra <[hidden email]> wrote:

I want new counter for every key of my windowed stream, And I want the same counter to get increment when the same key comes multiple times in incoming event.

So, I will write below code for every incoming event.
getRuntimeContext().getMetricGroup().counter(myKey).inc();

But above code fails when same value of myKey comes. As this is limitation of flink metrics. It throws exception, "Name collision: Adding a metric with the same name as a metric subgroup: "

Note: myKey is of String type and can have different value for every incoming event.
In my case I expect 100 thousands different values of myKey.

Now, To solve the issue, I have to keep the reference of 100 thousands values of myKey in some data structure e.g.
 Map<String, Counter> myMetricMap; 
 and for every myKey I have to do below.

        Counter counter = myMetricMap.get(myKey);
        if (null == windowMetricGauge)
        {
            Counter counter = new counter();
counter.inc();
            myMetricMap.put(myKey, counter);
            getRuntimeContext().getMetricGroup().counter(myKey,counter);
        }
else
{
counter.inc();
}
Above code suffice my purpose. But I do not want to maintain map of 100 thousands keys values of myKey.

Is there any alternate solution? I am looking for a solution where I achieve above functionality to maintain approx. 100 thousands counter metrics without keeping their reference in map (or any other data structure).


Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Issue with counter metrics for large number of keys

Zhenghua Gao
In reply to this post by Gaurav Luthra
So what you want is the counts of every keys ?
Why didn't you use count aggregation?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with counter metrics for large number of keys

Jamie Grier-2
+1 to what Zhenghua said.  You're abusing the metrics system I think.  Rather just do a stream.keyBy().sum() and then write a Sink to do something with the data -- for example push it to your metrics system if you wish.

However, from experience, many metrics systems don't like that sort of thing.  High cardinality metrics are usually a problem so I would expect to run into issues depending on what metrics system you are trying to get the data into.

-Jamie


On Wed, Jan 16, 2019 at 8:32 PM Zhenghua Gao <[hidden email]> wrote:
So what you want is the counts of every keys ?
Why didn't you use count aggregation?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/