Counter Implementation in Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view

Counter Implementation in Flink

This post was updated on .
The author has deleted this message.
Reply | Threaded
Open this post in threaded view

Re: Counter Implementation in Flink

Chesnay Schepler
The default implementation is the SimpleCounter, however I believe that
the implementation isn't of interest compared to how it is used.

Please expand on the graph ( which metric is represented by what color )
and include all of your code that interacts with the your counter.

On 24.11.2018 16:36, Anil wrote:

> I'm using Flink 1.4.2 and deploying job on Yarn Cluster.
> I have a streaming job, which flattens the data and outputs it. It basically
> takes a input record and produces n output record. I'm using Table Function
> for this. The logic to flatten the data is implemented in a UDF. The UDF has
> a counter which basically counts the number of records produced.
>             this.context.getMetricGroup().counter("output_records_counter")
> I know Flink provides numRecordsOut metric which is essentially gives me the
> same number.
> When the job is started the output records count seen for
> `output_records_counter` counter and `numRecordsOut` are exactly same.
> When a task manager is lost and the job is restarted there's a huge
> difference in the count of output records  . As seen in the graph when the
> job was started the both the counts are overlapping. When a task manager is
> lost and is re-deployed the count is different.  I'm not sure why this
> number varies so much.
> Can someone please shed some light on how is this counter implemented or
> direct me to source code or any reference material.
> For numRecordsOut, each taskmanager emits the count of data. Is the same not
> true for the output_records_counter.
> <>
> --
> Sent from: