Get watermark metric as a delta of current time

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Get watermark metric as a delta of current time

knur
Hello.

Flink emits watermark metrics (currentWatermark) as a Unix timestamp, which is useful in some context but troublesome for others. For instance, when sending data to Datadog, there is no way to meaningfully see or act upon this metric, because there is no support for timestamps.

A more useful metric would be the delta between the current watermark and the wall-clock time.

So I was trying to emit that metric myself from my job, but I'm quite lost. This is what I have tried:

1. I used a RichMapFunction expecting to get somehow the current watermark from the runtime context. I could not figure out how to get that so I tried hacking the metrics to get the watermark out of the metrics group. Something like this:

private fun getOperatorWatermarkGauge(metricName: String): Gauge<Long> {
  return try {
    val metricsField = AbstractMetricGroup::class.java.getDeclaredField("metrics")
    metricsField.isAccessible = true
    val metrics: Map<String, Metric> = metricsField.get(runtimeContext.metricGroup) as Map<String, Metric>
    metrics[metricName] as Gauge<Long>
  } catch (e: Exception) {
    LOGGER.error("Failed to get input watermark metric. Using no-op one", e)
    Gauge { 0L } // NO-OP gauge
  }
}

My idea was to use the inner gauge to get the current watermark and then emit the delta. That didn't work (that gauge does not return sensical values)

2. I tried creating a custom operator based on TimestampsAndPeriodicWatermarksOperator, that overloads the processWatermark function to get the current watermark. For some reason, that method is not called at all.

3. I might try to wrap the datadog reporter to intercept the watermark gauges and emit the delta from there.

So before I keep digging into this, I would like more opinions because right now it just feels I'm fighting against the API, and it seems to me that there should be a way to achieve this in a clean way.

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Get watermark metric as a delta of current time

Andrey Zagrebin-2
Hi Cristian,

Have you tried to extend AbstractUdfStreamOperator and override processWatermark?
This method should deliver the increasing watermark. Do you use processing or event time of records?

Best,
Andrey

On Mon, Jan 14, 2019 at 11:03 PM Cristian <[hidden email]> wrote:
Hello.

Flink emits watermark metrics (currentWatermark) as a Unix timestamp, which is useful in some context but troublesome for others. For instance, when sending data to Datadog, there is no way to meaningfully see or act upon this metric, because there is no support for timestamps.

A more useful metric would be the delta between the current watermark and the wall-clock time.

So I was trying to emit that metric myself from my job, but I'm quite lost. This is what I have tried:

1. I used a RichMapFunction expecting to get somehow the current watermark from the runtime context. I could not figure out how to get that so I tried hacking the metrics to get the watermark out of the metrics group. Something like this:

private fun getOperatorWatermarkGauge(metricName: String): Gauge<Long> {
  return try {
    val metricsField = AbstractMetricGroup::class.java.getDeclaredField("metrics")
    metricsField.isAccessible = true
    val metrics: Map<String, Metric> = metricsField.get(runtimeContext.metricGroup) as Map<String, Metric>
    metrics[metricName] as Gauge<Long>
  } catch (e: Exception) {
    LOGGER.error("Failed to get input watermark metric. Using no-op one", e)
    Gauge { 0L } // NO-OP gauge
  }
}

My idea was to use the inner gauge to get the current watermark and then emit the delta. That didn't work (that gauge does not return sensical values)

2. I tried creating a custom operator based on TimestampsAndPeriodicWatermarksOperator, that overloads the processWatermark function to get the current watermark. For some reason, that method is not called at all.

3. I might try to wrap the datadog reporter to intercept the watermark gauges and emit the delta from there.

So before I keep digging into this, I would like more opinions because right now it just feels I'm fighting against the API, and it seems to me that there should be a way to achieve this in a clean way.

Thanks.