Hi, all I want to know element's latency before write to Elasticsearch, so I registering a custom metrics as follow: class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] { private var metricGroup: Option[MetricGroup] = None private var latency: Long = _ private def init(runtimeContext: RuntimeContext): Unit = { if (metricGroup.isEmpty) { metricGroup = Some(runtimeContext.getMetricGroup) metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency)) } } def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest = { init(runtimeContext) latency = System.currentTimeMillis() - element.executeTime.getMillis Requests.indexRequest.index("test").`type`("event").source(element.json) } override def process(element: EventEntry, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = requestIndexer.add(createIndexRequest(element, runtimeContext)) } but that does not seem to work, Does anyone know why? Regards wyp |
Hello,
Plase provide more information as to how it is not working as expected. Does it throw an exception, log a warning, is the metric not get registered at all or does the value not changing? On 06.07.2017 08:10, wyphao.2007 wrote:
|
Hi Chesnay, thank you for your reply The code above does not get registered at all. 在2017年07月06 14时45分, "Chesnay Schepler"<[hidden email]>写道:
|
How are you verifying whether it is
registered?
For the sake of covering all angles: Are you certain that createPartitionIndex is called? On 06.07.2017 08:51, wyphao.2007 wrote:
|
Free forum by Nabble | Edit this page |