Registering custom metrics does not work

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Registering custom metrics does not work

wyphao.2007
Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:

class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] {
  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _

  private def init(runtimeContext: RuntimeContext): Unit = {
    if (metricGroup.isEmpty) {
      metricGroup = Some(runtimeContext.getMetricGroup)
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
    }
  }

  def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = {
    init(runtimeContext)
    latency = System.currentTimeMillis() - element.executeTime.getMillis
    Requests.indexRequest.index("test").`type`("event").source(element.json)
  }

  override def process(element: EventEntry,
                       runtimeContext: RuntimeContext,
                       requestIndexer: RequestIndexer): Unit =
    requestIndexer.add(createIndexRequest(element, runtimeContext))
}

but that does not seem to work, Does anyone know why?

Regards
wyp

Reply | Threaded
Open this post in threaded view
|

Re: Registering custom metrics does not work

Chesnay Schepler
Hello,

Plase provide more information as to how it is not working as expected.

Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?

On 06.07.2017 08:10, wyphao.2007 wrote:
Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow:

class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] {
  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _

  private def init(runtimeContext: RuntimeContext): Unit = {
    if (metricGroup.isEmpty) {
      metricGroup = Some(runtimeContext.getMetricGroup)
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
    }
  }

  def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = {
    init(runtimeContext)
    latency = System.currentTimeMillis() - element.executeTime.getMillis
    Requests.indexRequest.index("test").`type`("event").source(element.json)
  }

  override def process(element: EventEntry,
                       runtimeContext: RuntimeContext,
                       requestIndexer: RequestIndexer): Unit =
    requestIndexer.add(createIndexRequest(element, runtimeContext))
}

but that does not seem to work, Does anyone know why?

Regards
wyp


Reply | Threaded
Open this post in threaded view
|

Re:Re: Registering custom metrics does not work

wyphao.2007
Hi Chesnay, thank you for your reply

The code above does not get registered at all.



在2017年07月06 14时45分, "Chesnay Schepler"<[hidden email]>写道:

             
Hello,
     
     Plase provide more information as to how it is not working as      expected.
     
     Does it throw an exception, log a warning, is the metric
     not get registered at all or does the value not changing?
     
     On 06.07.2017 08:10, wyphao.2007 wrote:
   
   
     
       
Hi, all
       
I want to know element's latency before write to          Elasticsearch, so I registering a custom metrics as follow:
       

       
       
class CustomElasticsearchSinkFunction extends          ElasticsearchSinkFunction[EventEntry] {
       
  private var metricGroup: Option[MetricGroup] = None
       
  private var latency: Long = _
       

       
       
  private def init(runtimeContext: RuntimeContext): Unit =          {
       
    if (metricGroup.isEmpty) {
       
      metricGroup = Some(runtimeContext.getMetricGroup)
       
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",          ScalaGauge[Long](() => latency))
       
    }
       
  }
       

       
       
  def createIndexRequest(element: EventEntry,          runtimeContext: RuntimeContext): IndexRequest = {
       
    init(runtimeContext)
       
    latency = System.currentTimeMillis() -          element.executeTime.getMillis
       
             Requests.indexRequest.index("test").`type`("event").source(element.json)
       
  }
       

       
       
  override def process(element: EventEntry,
       
                       runtimeContext: RuntimeContext,
       
                       requestIndexer: RequestIndexer):          Unit =
       
    requestIndexer.add(createIndexRequest(element,          runtimeContext))
       
}
       

       
       
but that does not seem to work, Does anyone know why?
       

       
       
Regards
       
wyp
       

       
     
         
   


   

Reply | Threaded
Open this post in threaded view
|

Re: Registering custom metrics does not work

Chesnay Schepler
How are you verifying whether it is registered?

For the sake of covering all angles: Are you certain that createPartitionIndex is called?

On 06.07.2017 08:51, wyphao.2007 wrote:
Hi Chesnay, thank you for your reply

The code above does not get registered at all.



在2017年07月06 14时45分, "Chesnay Schepler"[hidden email]写道:

             
Hello,
     
     Plase provide more information as to how it is not working as      expected.
     
     Does it throw an exception, log a warning, is the metric
     not get registered at all or does the value not changing?
     
     On 06.07.2017 08:10, wyphao.2007 wrote:
   
   
     
       
Hi, all
       
I want to know element's latency before write to          Elasticsearch, so I registering a custom metrics as follow:
       

       
       
class CustomElasticsearchSinkFunction extends          ElasticsearchSinkFunction[EventEntry] {
       
  private var metricGroup: Option[MetricGroup] = None
       
  private var latency: Long = _
       

       
       
  private def init(runtimeContext: RuntimeContext): Unit =          {
       
    if (metricGroup.isEmpty) {
       
      metricGroup = Some(runtimeContext.getMetricGroup)
       
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",          ScalaGauge[Long](() => latency))
       
    }
       
  }
       

       
       
  def createIndexRequest(element: EventEntry,          runtimeContext: RuntimeContext): IndexRequest = {
       
    init(runtimeContext)
       
    latency = System.currentTimeMillis() -          element.executeTime.getMillis
       
             Requests.indexRequest.index("test").`type`("event").source(element.json)
       
  }
       

       
       
  override def process(element: EventEntry,
       
                       runtimeContext: RuntimeContext,
       
                       requestIndexer: RequestIndexer):          Unit =
       
    requestIndexer.add(createIndexRequest(element,          runtimeContext))
       
}
       

       
       
but that does not seem to work, Does anyone know why?
       

       
       
Regards
       
wyp