I was testing flink Gauge metric in my flat map function . I was just sending a value (end time -start time ) of a DB Op in Flatmap . But on dashboard is only showing first value instead of updated value .
I am using following code in my flat map . getRuntimeContext() .getMetricGroup() .gauge("DBOpInSec", (Gauge<String>) () -> String.valueOf( (endTime-startTime)/1000 )); |
how long does the job run? The metrics are only updated every 10
seconds; and are not updated when the job finishes. On 20.06.2017 12:12, sohimankotia wrote: > I was testing flink Gauge metric in my flat map function . I was just sending > a value (end time -start time ) of a DB Op in Flatmap . But on dashboard is > only showing first value instead of updated value . > > I am using following code in my flat map . > > getRuntimeContext() > .getMetricGroup() > .gauge("DBOpInSec", (Gauge<String>) () -> String.valueOf( > (endTime-startTime)/1000 )); > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
I ran job and monitored for approx 20 mins .
I tried with meter,accumulators,histogram,gauge . Out of those only meter and accumulators were updating values, other were only only showing constant value all the time . |
Can you provide more of your code (you can also send it to me directly)?
I'm interested in where the startTime/endTime arguments are defined. On 21.06.2017 10:47, sohimankotia wrote: > I ran job and monitored for approx 20 mins . > > I tried with meter,accumulators,histogram,gauge . > > Out of those only meter and accumulators were updating values, other were > only only showing constant value all the time . > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13886.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Here it is :
import com.codahale.metrics.SlidingWindowReservoir; import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple; import in.dailyhunt.cis.enrichments.datatype.SinkTuple; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.util.Collector; public class DBFlatMap extends RichFlatMapFunction<BasicInfoTuple, SinkTuple> { private transient Meter meter; private transient org.apache.flink.metrics.Histogram histogram; @Override public void open(Configuration parameters) throws Exception { /* some app specific code */ com.codahale.metrics.Meter meter1 = new com.codahale.metrics.Meter(); this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new DropwizardMeterWrapper(meter1)); com.codahale.metrics.Histogram histogram1 = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new DropwizardHistogramWrapper(histogram1)); } @Override public void flatMap(BasicInfoTuple in, Collector<SinkTuple> out) throws Exception { long start = System.currentTimeMillis(); incrementCounter("input-in-group-and-KeyElect-Flow", this.getRuntimeContext()); this.dbOperations(); addMetricData(start, System.currentTimeMillis()); this.meter.markEvent(); } private void dbOperations() { // Db Operation and some app logic } public void incrementCounter(String counterName, RuntimeContext runtimeContext) { if (runtimeContext == null) { return; } LongCounter lc = runtimeContext.getLongCounter(counterName); if (lc == null) { lc = new LongCounter(); runtimeContext.addAccumulator(counterName, lc); } lc.add(1L); } private void addMetricData(long startTime, long endTime) { final long opTimeInSec = (endTime - startTime) / 1000; this.histogram.update(opTimeInSec); getRuntimeContext().getMetricGroup() .gauge("DbOpGauge", (Gauge<String>) () -> String.valueOf(opTimeInSec)); } } |
The reason why the gauge value is not updating is because you are not
actually updating the gauge, but register a new gauge under the same name. The subsequent registration are ignored, and should've logged a warning. I suggest to make your gauge stateful by adding a field for the opTimeInSec with a setter which you call in addMetricData(...). On 21.06.2017 11:48, sohimankotia wrote: > Here it is : > > > import com.codahale.metrics.SlidingWindowReservoir; > import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple; > import in.dailyhunt.cis.enrichments.datatype.SinkTuple; > import org.apache.flink.api.common.accumulators.LongCounter; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > import org.apache.flink.api.common.functions.RuntimeContext; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; > import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; > import org.apache.flink.metrics.Gauge; > import org.apache.flink.metrics.Meter; > import org.apache.flink.util.Collector; > > > public class DBFlatMap extends RichFlatMapFunction<BasicInfoTuple, > SinkTuple> { > > private transient Meter meter; > private transient org.apache.flink.metrics.Histogram histogram; > > > @Override > public void open(Configuration parameters) throws Exception { > > /* > some app specific code > */ > > com.codahale.metrics.Meter meter1 = new com.codahale.metrics.Meter(); > this.meter = getRuntimeContext() > .getMetricGroup() > .meter("myMeter", new DropwizardMeterWrapper(meter1)); > > com.codahale.metrics.Histogram histogram1 = > new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); > > this.histogram = getRuntimeContext() > .getMetricGroup() > .histogram("myHistogram", new DropwizardHistogramWrapper(histogram1)); > > } > > @Override > public void flatMap(BasicInfoTuple in, Collector<SinkTuple> out) throws > Exception { > long start = System.currentTimeMillis(); > incrementCounter("input-in-group-and-KeyElect-Flow", > this.getRuntimeContext()); > this.dbOperations(); > addMetricData(start, System.currentTimeMillis()); > this.meter.markEvent(); > } > > private void dbOperations() { > // Db Operation and some app logic > } > > public void incrementCounter(String counterName, RuntimeContext > runtimeContext) { > if (runtimeContext == null) { > return; > } > > LongCounter lc = runtimeContext.getLongCounter(counterName); > if (lc == null) { > lc = new LongCounter(); > runtimeContext.addAccumulator(counterName, lc); > } > lc.add(1L); > } > > > private void addMetricData(long startTime, long endTime) { > final long opTimeInSec = (endTime - startTime) / 1000; > this.histogram.update(opTimeInSec); > getRuntimeContext().getMetricGroup() > .gauge("DbOpGauge", (Gauge<String>) () -> String.valueOf(opTimeInSec)); > > } > > > } > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13888.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Basically Every time I am calling add metric method it is just registering the gauge .
I can register this gauge in open method and then in flatmap update the value of gauge . Right ? |
Exactly, you register the gauge once in open(), and modify the code so
that this gauge returns different values. On 21.06.2017 12:04, sohimankotia wrote: > Basically Every time I am calling add metric method it is just registering > the gauge . > > I can register this gauge in open method and then in flatmap update the > value of gauge . > > Right ? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13891.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Cool. Thanks Closing thread .
|
Free forum by Nabble | Edit this page |