Hi, I am trying to figure out how many records came into the Flink App from KDS and how many records got moved to the next step or was dropped by the watermarks. I see on the Ui Table for Source. Records Sent with a total and the next step Filter->FlatMap operator with a Records Received total. How can I get these metric values for me to display In Grafana for eg. as I want to know a count for each 5 secs, how many records came in and how many were filtered out by the watermark or my Custom Filter operator etc ? I looked at the breakdown of the Source__Custom_Source in Metrics as show in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and 0.Timestamps/Watermarks.numRecordsOut Attached are some screenshots of the Flink DashBoard UI. TIA, ![]() ![]() |
Setting up a Flink metrics dashboard in Grafana requires setting up and configuring one of Flink's metrics reporters [1] that is supported by Grafana as a data source. That means your options for a metrics reporter are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter. If you want reporting every 5 seconds, with the push based reporters that's something you would configure in flink-conf.yaml, whereas with Prometheus you'll need to configure the scrape interval in the prometheus config file. For more on using Flink with Prometheus, see the blog post by Maximilian Bode [2]. On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi David, Thanks for your reply. I am already using the PrometheusReporter. I am trying to figure out how to dig into the application data and count grouped by an attribute called event_name in the incoming application data and report to Grafana via Prometheus. I see the following at a high level task_numRecordsIn task_numRecordsOut ..operator_numLateRecordsDropped Trying to dig in deeper than this numRecordsIn to get groped by event_name attribute coming in the Input record every 5 secs. TIA, On Sat, Jul 25, 2020 at 10:55 AM David Anderson <[hidden email]> wrote:
|
Hi Al, I am looking at the Custom User Metrics to count incoming records by an incomng attribute, event_name and aggregate it over 5 secs. I looked at https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter. I am trying to figure out which one to use Counter or Meter. If using Counter, how do I reset it after 5 secs. If using Meter which measures avg throughput, How do i specify a duration like 5 secs ? markEvent(long n) ??? I am also trying to collect total count of events across all TaskManagers. Do I collect at flink_taskmanager_job_task_<customMetricName>_numrecordsIn or flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn ?? (so at task or operator level Or should I use User variables like below:
Pardon my confusion here. TIA, On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <[hidden email]> wrote:
|
I'd recommend to do the aggregation
over 5 seconds in graphite/prometheus etc., and expose a counter
in Flink for each attribute/event_name.
User variables are a good choice for
encoding the attribute/event_name values.
As for your remaining questions:
Flink does not support aggregating
operator-level metrics across task executors. This job is left to
proper time-series databases.
A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom
implementation with whatever behavior you desire.
The default meter implementation
(MeterView) calculate the rate of events per second based on
counts that are periodically gathered over some time-period
(usually 1 minute). If you want to calculate the rate-per-second
over the last 5 seconds, then new Meterview(5) should do
the trick.
If you want to have a
rate-per-5-seconds, then you will need to implement a custom
meter. Note that I would generally discourage this as it will not
work properly with some metric systems which assume rates to be
per-second.
On 27/07/2020 19:59, Vijay Balakrishnan
wrote:
|
Hi David, Thx for your reply. To summarize: Use a Counter:
Also, I am assuming that the Counter will get reset after every Window interval of 5 secs or do I need to do counter.dec(counter.getCount()) in the close() method as you showed above. TIA,
On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <[hidden email]> wrote:
|
If you do the aggregation in Prometheus
I would think that you do not need to reset the counter; but it's
been a while since I've used it.
Flink will not automatically reset
counters.
If this is necessary then you will have
to manually reset the counter every 5 seconds.
The name under which it will be exposed
to Prometheus depends on the configured scope format; see the
metric documentation for details.
By default it will contain information
about the task executors, job, task etc. .
On 30/07/2020 22:02, Vijay Balakrishnan
wrote:
|
Hello Vijay, I have the same use case where I am reading from Kafka and want to report count corresponding to each event every 5 mins. On Prometheus, I want to set an alert if fr any event we do not receive the event like say count is zero. So can you please help me with how you implemented this finally? On Fri, Jul 31, 2020 at 2:14 AM Chesnay Schepler <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |