kinesis consumer metrics user variables

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

kinesis consumer metrics user variables

Yitzchak Lieberman
Hi.

I would like to have the ability to control the metric group of flink kinesis consumer:
As written below it creates metric identifier for each stream name and shard id (in our case more than 1000 metric identifiers), in such matter it cannot be aggregated in data dog graph
private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
ShardMetricsReporter shardMetrics = new ShardMetricsReporter();

MetricGroup streamShardMetricGroup = metricGroup
.addGroup(
KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
shardState.getStreamShardHandle().getStreamName())
.addGroup(
KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
shardState.getStreamShardHandle().getShard().getShardId());

streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
return shardMetrics;
}
Would be happy for your advice.

Thanks,
Yitzchak.
Reply | Threaded
Open this post in threaded view
|

Re: kinesis consumer metrics user variables

Chesnay Schepler

What exactly would you prefer? Without the stream name and shard id you'd end up with name clashes all over the place.

Why can you not aggregate them? Surely Datadog supports some way to define a wildcard when definying the tags to aggregate.


On 03/10/2019 09:09, Yitzchak Lieberman wrote:
Hi.

I would like to have the ability to control the metric group of flink kinesis consumer:
As written below it creates metric identifier for each stream name and shard id (in our case more than 1000 metric identifiers), in such matter it cannot be aggregated in data dog graph
private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
   ShardMetricsReporter shardMetrics = new ShardMetricsReporter();

   MetricGroup streamShardMetricGroup = metricGroup
      .addGroup(
         KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
         shardState.getStreamShardHandle().getStreamName())
      .addGroup(
         KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
         shardState.getStreamShardHandle().getShard().getShardId());

   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
   return shardMetrics;
}
Would be happy for your advice.

Thanks,
Yitzchak.


Reply | Threaded
Open this post in threaded view
|

Re: kinesis consumer metrics user variables

Yitzchak Lieberman
Hi. yes, I prefer to have the option to remove new metric groups.
It shouldn't do any name clashes as it appears on the tags.
Right now I've compiled flink kinesis connector with boolean option to control it.

On Mon, Oct 7, 2019 at 11:05 AM Chesnay Schepler <[hidden email]> wrote:

What exactly would you prefer? Without the stream name and shard id you'd end up with name clashes all over the place.

Why can you not aggregate them? Surely Datadog supports some way to define a wildcard when definying the tags to aggregate.


On 03/10/2019 09:09, Yitzchak Lieberman wrote:
Hi.

I would like to have the ability to control the metric group of flink kinesis consumer:
As written below it creates metric identifier for each stream name and shard id (in our case more than 1000 metric identifiers), in such matter it cannot be aggregated in data dog graph
private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
   ShardMetricsReporter shardMetrics = new ShardMetricsReporter();

   MetricGroup streamShardMetricGroup = metricGroup
      .addGroup(
         KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
         shardState.getStreamShardHandle().getStreamName())
      .addGroup(
         KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
         shardState.getStreamShardHandle().getShard().getShardId());

   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
   return shardMetrics;
}
Would be happy for your advice.

Thanks,
Yitzchak.


Reply | Threaded
Open this post in threaded view
|

Re: kinesis consumer metrics user variables

Chesnay Schepler
How do the shard name and id appear in the tags when you remove the metric groups?

There should be name clashes within Flink for any consumer that reads from multiple shards, since the metrics for individual shards are no longer uniquely identified.

Which reporter are you using?

I would recommend to modify the reporter you're using to use the logical scope instead of the metric identifier, like the PrometheusReporter does.
This gives you a consistent metric identifier (something like "taskmanager.task.operator.kinesis.shard_name.shard_id.max_records_per_fetch"), while tags would be used to differentiate between specific instances.

On 07/10/2019 10:26, Yitzchak Lieberman wrote:
Hi. yes, I prefer to have the option to remove new metric groups.
It shouldn't do any name clashes as it appears on the tags.
Right now I've compiled flink kinesis connector with boolean option to control it.

On Mon, Oct 7, 2019 at 11:05 AM Chesnay Schepler <[hidden email]> wrote:

What exactly would you prefer? Without the stream name and shard id you'd end up with name clashes all over the place.

Why can you not aggregate them? Surely Datadog supports some way to define a wildcard when definying the tags to aggregate.


On 03/10/2019 09:09, Yitzchak Lieberman wrote:
Hi.

I would like to have the ability to control the metric group of flink kinesis consumer:
As written below it creates metric identifier for each stream name and shard id (in our case more than 1000 metric identifiers), in such matter it cannot be aggregated in data dog graph
private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
   ShardMetricsReporter shardMetrics = new ShardMetricsReporter();

   MetricGroup streamShardMetricGroup = metricGroup
      .addGroup(
         KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
         shardState.getStreamShardHandle().getStreamName())
      .addGroup(
         KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
         shardState.getStreamShardHandle().getShard().getShardId());

   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
   streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
   return shardMetrics;
}
Would be happy for your advice.

Thanks,
Yitzchak.