kinesis consumer metrics user variables

Posted by Yitzchak Lieberman on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/kinesis-consumer-metrics-user-variables-tp30314.html

Hi.

I would like to have the ability to control the metric group of flink kinesis consumer:
As written below it creates metric identifier for each stream name and shard id (in our case more than 1000 metric identifiers), in such matter it cannot be aggregated in data dog graph
private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
ShardMetricsReporter shardMetrics = new ShardMetricsReporter();

MetricGroup streamShardMetricGroup = metricGroup
.addGroup(
KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
shardState.getStreamShardHandle().getStreamName())
.addGroup(
KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
shardState.getStreamShardHandle().getShard().getShardId());

streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
return shardMetrics;
}
Would be happy for your advice.

Thanks,
Yitzchak.