Count of records coming into the Flink App from KDS and at each step through Execution Graph

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Count of records coming into the Flink App from KDS and at each step through Execution Graph

Vijay Balakrishnan
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,


Screen Shot 2020-07-23 at 3.38.43 PM.png (357K) Download Attachment
Screen Shot 2020-07-23 at 3.39.58 PM.png (315K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

David Anderson-3
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2].


On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

Vijay Balakrishnan
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to dig into the application data and count grouped by an attribute called event_name in the incoming application data and report to Grafana via Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson <[hidden email]> wrote:
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2].


On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

Vijay Balakrishnan
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an incomng attribute, event_name and aggregate it over 5 secs.
I am trying to figure out which one to use Counter or Meter. 
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task_<customMetricName>_numrecordsIn  or
flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn  ?? (so at task or operator level

Or should I use User variables like below:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");

Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to dig into the application data and count grouped by an attribute called event_name in the incoming application data and report to Grafana via Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson <[hidden email]> wrote:
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2].


On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

Chesnay Schepler
I'd recommend to do the aggregation over 5 seconds in graphite/prometheus etc., and expose a counter in Flink for each attribute/event_name.

User variables are a good choice for encoding the attribute/event_name values.

As for your remaining questions:

Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases.

A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior you desire.

The default meter implementation (MeterView) calculate the rate of events per second based on counts that are periodically gathered over some time-period (usually 1 minute). If you want to calculate the rate-per-second over the last 5 seconds, then new Meterview(5) should do the trick.
If you want to have a rate-per-5-seconds, then you will need to implement a custom meter. Note that I would generally discourage this as it will not work properly with some metric systems which assume rates to be per-second.

On 27/07/2020 19:59, Vijay Balakrishnan wrote:
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an incomng attribute, event_name and aggregate it over 5 secs.
I am trying to figure out which one to use Counter or Meter. 
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task_<customMetricName>_numrecordsIn  or
flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn  ?? (so at task or operator level

Or should I use User variables like below:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");

Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to dig into the application data and count grouped by an attribute called event_name in the incoming application data and report to Grafana via Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson <[hidden email]> wrote:
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2].


On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,


Reply | Threaded
Open this post in threaded view
|

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

Vijay Balakrishnan
Hi David,
Thx for your reply.

To summarize:
Use a Counter:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");
This MyMetricsValue will show up in Prometheus as for eg: 0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter and so on for 1.

Window(TumblingWindow...).. for each parallel Operator. 
This will then have to be aggregated in Prometheus for 5 secs for all the <parallelismCount>.

Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter   // no task executors here - this is at Operator level ???
This is independent of task Executors right ?? How does your statement - Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases. relate to the Summary above from me 
Also, I am assuming that the Counter will get reset after every Window interval of 5 secs or do I need to do counter.dec(counter.getCount()) in the close() method as you showed above.
TIA,




On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <[hidden email]> wrote:
I'd recommend to do the aggregation over 5 seconds in graphite/prometheus etc., and expose a counter in Flink for each attribute/event_name.

User variables are a good choice for encoding the attribute/event_name values.

As for your remaining questions:

Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases.

A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior you desire.

The default meter implementation (MeterView) calculate the rate of events per second based on counts that are periodically gathered over some time-period (usually 1 minute). If you want to calculate the rate-per-second over the last 5 seconds, then new Meterview(5) should do the trick.
If you want to have a rate-per-5-seconds, then you will need to implement a custom meter. Note that I would generally discourage this as it will not work properly with some metric systems which assume rates to be per-second.

On 27/07/2020 19:59, Vijay Balakrishnan wrote:
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an incomng attribute, event_name and aggregate it over 5 secs.
I am trying to figure out which one to use Counter or Meter. 
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task_<customMetricName>_numrecordsIn  or
flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn  ?? (so at task or operator level

Or should I use User variables like below:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");

Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to dig into the application data and count grouped by an attribute called event_name in the incoming application data and report to Grafana via Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson <[hidden email]> wrote:
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2].


On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,


Reply | Threaded
Open this post in threaded view
|

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

Chesnay Schepler
If you do the aggregation in Prometheus I would think that you do not need to reset the counter; but it's been a while since I've used it.
Flink will not automatically reset counters.
If this is necessary then you will have to manually reset the counter every 5 seconds.

The name under which it will be exposed to Prometheus depends on the configured scope format; see the metric documentation for details.
By default it will contain information about the task executors, job, task etc. .

On 30/07/2020 22:02, Vijay Balakrishnan wrote:
Hi David,
Thx for your reply.

To summarize:
Use a Counter:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");
This MyMetricsValue will show up in Prometheus as for eg: 0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter and so on for 1.

Window(TumblingWindow...).. for each parallel Operator. 
This will then have to be aggregated in Prometheus for 5 secs for all the <parallelismCount>.

Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter   // no task executors here - this is at Operator level ???
This is independent of task Executors right ?? How does your statement - Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases. relate to the Summary above from me 
Also, I am assuming that the Counter will get reset after every Window interval of 5 secs or do I need to do counter.dec(counter.getCount()) in the close() method as you showed above.
TIA,

          



On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <[hidden email]> wrote:
I'd recommend to do the aggregation over 5 seconds in graphite/prometheus etc., and expose a counter in Flink for each attribute/event_name.

User variables are a good choice for encoding the attribute/event_name values.

As for your remaining questions:

Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases.

A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior you desire.

The default meter implementation (MeterView) calculate the rate of events per second based on counts that are periodically gathered over some time-period (usually 1 minute). If you want to calculate the rate-per-second over the last 5 seconds, then new Meterview(5) should do the trick.
If you want to have a rate-per-5-seconds, then you will need to implement a custom meter. Note that I would generally discourage this as it will not work properly with some metric systems which assume rates to be per-second.

On 27/07/2020 19:59, Vijay Balakrishnan wrote:
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an incomng attribute, event_name and aggregate it over 5 secs.
I am trying to figure out which one to use Counter or Meter. 
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task_<customMetricName>_numrecordsIn  or
flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn  ?? (so at task or operator level

Or should I use User variables like below:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");

Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to dig into the application data and count grouped by an attribute called event_name in the incoming application data and report to Grafana via Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson <[hidden email]> wrote:
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2].


On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,



Reply | Threaded
Open this post in threaded view
|

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

anuj.aj07
Hello Vijay,
I have the same use case where I am reading from Kafka and want to report count corresponding to each event every 5 mins. On Prometheus, I want to set an alert if fr any event we do not receive the event like say count is zero. 

So can you please help me with how you implemented this finally? 

On Fri, Jul 31, 2020 at 2:14 AM Chesnay Schepler <[hidden email]> wrote:
If you do the aggregation in Prometheus I would think that you do not need to reset the counter; but it's been a while since I've used it.
Flink will not automatically reset counters.
If this is necessary then you will have to manually reset the counter every 5 seconds.

The name under which it will be exposed to Prometheus depends on the configured scope format; see the metric documentation for details.
By default it will contain information about the task executors, job, task etc. .

On 30/07/2020 22:02, Vijay Balakrishnan wrote:
Hi David,
Thx for your reply.

To summarize:
Use a Counter:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");
This MyMetricsValue will show up in Prometheus as for eg: 0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter and so on for 1.

Window(TumblingWindow...).. for each parallel Operator. 
This will then have to be aggregated in Prometheus for 5 secs for all the <parallelismCount>.

Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter   // no task executors here - this is at Operator level ???
This is independent of task Executors right ?? How does your statement - Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases. relate to the Summary above from me 
Also, I am assuming that the Counter will get reset after every Window interval of 5 secs or do I need to do counter.dec(counter.getCount()) in the close() method as you showed above.
TIA,

          



On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <[hidden email]> wrote:
I'd recommend to do the aggregation over 5 seconds in graphite/prometheus etc., and expose a counter in Flink for each attribute/event_name.

User variables are a good choice for encoding the attribute/event_name values.

As for your remaining questions:

Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases.

A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior you desire.

The default meter implementation (MeterView) calculate the rate of events per second based on counts that are periodically gathered over some time-period (usually 1 minute). If you want to calculate the rate-per-second over the last 5 seconds, then new Meterview(5) should do the trick.
If you want to have a rate-per-5-seconds, then you will need to implement a custom meter. Note that I would generally discourage this as it will not work properly with some metric systems which assume rates to be per-second.

On 27/07/2020 19:59, Vijay Balakrishnan wrote:
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an incomng attribute, event_name and aggregate it over 5 secs.
I am trying to figure out which one to use Counter or Meter. 
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task_<customMetricName>_numrecordsIn  or
flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn  ?? (so at task or operator level

Or should I use User variables like below:
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance
  .counter("myCounter");

Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to dig into the application data and count grouped by an attribute called event_name in the incoming application data and report to Grafana via Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson <[hidden email]> wrote:
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2].


On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks.

I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07