I want new counter for every key of my windowed stream, And I want the same counter to get increment when the same key comes multiple times in incoming event. So, I will write below code for every incoming event. getRuntimeContext().getMetricGroup().counter(myKey).inc(); But above code fails when same value of myKey comes. As this is limitation of flink metrics. It throws exception, "Name collision: Adding a metric with the same name as a metric subgroup: " Note: myKey is of String type and can have different value for every incoming event. In my case I expect 100 thousands different values of myKey. Now, To solve the issue, I have to keep the reference of 100 thousands values of myKey in some data structure e.g. Map<String, Counter> myMetricMap; and for every myKey I have to do below. Counter counter = myMetricMap.get(myKey); if (null == windowMetricGauge) { Counter counter = new counter(); counter.inc(); myMetricMap.put(myKey, counter); getRuntimeContext().getMetricGroup().counter(myKey,counter); } else { counter.inc(); } Above code suffice my purpose. But I do not want to maintain map of 100 thousands keys values of myKey. Is there any alternate solution? I am looking for a solution where I achieve above functionality to maintain approx. 100 thousands counter metrics without keeping their reference in map (or any other data structure). Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 |
Hi Gaurav,
I’ve use a few hundred counters before without problems. My concern about > 100K unique counters is that you wind up generating load (and maybe memory issues) for the JobManager. E.g. with Hadoop’s metric system trying to go much beyond 1000 counters could cause significant problems. IIRC it was due to the JobTracker getting bogged down processing too many counter updates, and/or running out of memory. It’s possible more recent versions of Hadoop no longer have that problem. But someone on the Flink dev team should weigh in here… — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi Ken, Thanks for your inputs again. I will wait for Flink guys to come back to me for the suggestion of implementation of 100 K unique counters. For time being, I will make the number of counter metric value a configurable parameter in my application. So, user will know what he is trying to do. And will restrict the maximum value to 1000 so that no mishap happens about memory. and will tune this max value with memories of JobManager and my application. And try to explore other solutions in flink. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 On Thu, Jan 17, 2019 at 9:40 AM Ken Krugler <[hidden email]> wrote:
|
In reply to this post by Gaurav Luthra
So what you want is the counts of every keys ?
Why didn't you use count aggregation? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
+1 to what Zhenghua said. You're abusing the metrics system I think. Rather just do a stream.keyBy().sum() and then write a Sink to do something with the data -- for example push it to your metrics system if you wish. However, from experience, many metrics systems don't like that sort of thing. High cardinality metrics are usually a problem so I would expect to run into issues depending on what metrics system you are trying to get the data into. -Jamie On Wed, Jan 16, 2019 at 8:32 PM Zhenghua Gao <[hidden email]> wrote: So what you want is the counts of every keys ? |
Free forum by Nabble | Edit this page |