Background: We are using a job using ProcessFunction which reads data from kafka fires ~5-10K timers per second and sends matched events to KafkaSink. We are collecting metrics for collecting no of active timers, no of timers scheduled etc. We use statsd reporter and monitor using Grafana dashboard & RocksDBStateBackend backed by HDFS as state.
Observations/Problems: 1. Counter value suddenly got reset: While job was running fine, on one fine moment, metric of a monotonically increasing counter (Counter where we just used inc() operation) suddenly became 0 and then resumed from there onwards. Only exception in the logs were related to transient connectivity issues to datanodes. Also there was no other indicator of any failure observed after inspecting system metrics/checkpoint metrics. It happened just once across multiple runs of a same job. 2. Counters not retained during flink restart with savepoint: Cancelled job with -s option taking savepoint and then restarted the job using the savepoint. After restart metrics started from 0. I was expecting metric value of a given operator would also be part of state. 3. Counter metrics getting sent as Gauge: Using tcpdump I was inspecting the format in which metric are sent to statsd. I observed that even the metric which in my code were counters, were sent as gauges. I didn't get why that was so. Can anyone please add more insights into why above mentioned behaviors would have happened? Also does flink store metric values as a part of state for stateful operators? Is there any way to configure that? |
1. This shouldn't happen. Do you access the counter from different threads?
2. Metrics in general are not persisted across restarts, and there is no way to configure flink to do so at the moment. 3. Counters are sent as gauges since as far as I know StatsD counters are not allowed to be decremented. On 19.05.2017 08:56, jaxbihani wrote: > Background: We are using a job using ProcessFunction which reads data from > kafka fires ~5-10K timers per second and sends matched events to KafkaSink. > We are collecting metrics for collecting no of active timers, no of timers > scheduled etc. We use statsd reporter and monitor using Grafana dashboard & > RocksDBStateBackend backed by HDFS as state. > > Observations/Problems: > 1. *Counter value suddenly got reset:* While job was running fine, on one > fine moment, metric of a monotonically increasing counter (Counter where we > just used inc() operation) suddenly became 0 and then resumed from there > onwards. Only exception in the logs were related to transient connectivity > issues to datanodes. Also there was no other indicator of any failure > observed after inspecting system metrics/checkpoint metrics. It happened > just once across multiple runs of a same job. > 2. *Counters not retained during flink restart with savepoint*: Cancelled > job with -s option taking savepoint and then restarted the job using the > savepoint. After restart metrics started from 0. I was expecting metric > value of a given operator would also be part of state. > 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting > the format in which metric are sent to statsd. I observed that even the > metric which in my code were counters, were sent as gauges. I didn't get why > that was so. > > Can anyone please add more insights into why above mentioned behaviors would > have happened? > Also does flink store metric values as a part of state for stateful > operators? Is there any way to configure that? > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
2. isn't quite accurate actually; metrics on the TaskManager are not
persisted across restarts. On 19.05.2017 11:21, Chesnay Schepler wrote: > 1. This shouldn't happen. Do you access the counter from different > threads? > > 2. Metrics in general are not persisted across restarts, and there is > no way to configure flink to do so at the moment. > > 3. Counters are sent as gauges since as far as I know StatsD counters > are not allowed to be decremented. > > On 19.05.2017 08:56, jaxbihani wrote: >> Background: We are using a job using ProcessFunction which reads data >> from >> kafka fires ~5-10K timers per second and sends matched events to >> KafkaSink. >> We are collecting metrics for collecting no of active timers, no of >> timers >> scheduled etc. We use statsd reporter and monitor using Grafana >> dashboard & >> RocksDBStateBackend backed by HDFS as state. >> >> Observations/Problems: >> 1. *Counter value suddenly got reset:* While job was running fine, >> on one >> fine moment, metric of a monotonically increasing counter (Counter >> where we >> just used inc() operation) suddenly became 0 and then resumed from there >> onwards. Only exception in the logs were related to transient >> connectivity >> issues to datanodes. Also there was no other indicator of any failure >> observed after inspecting system metrics/checkpoint metrics. It >> happened >> just once across multiple runs of a same job. >> 2. *Counters not retained during flink restart with savepoint*: >> Cancelled >> job with -s option taking savepoint and then restarted the job using the >> savepoint. After restart metrics started from 0. I was expecting metric >> value of a given operator would also be part of state. >> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was >> inspecting >> the format in which metric are sent to statsd. I observed that even the >> metric which in my code were counters, were sent as gauges. I didn't >> get why >> that was so. >> >> Can anyone please add more insights into why above mentioned >> behaviors would >> have happened? >> Also does flink store metric values as a part of state for stateful >> operators? Is there any way to configure that? >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > > |
@Chesnay With timers it will happen that onTimer() is called from a different Thread than the Tread that is calling processElement(). If Metrics updates happen in both, would that be a problem?
> On 19. May 2017, at 11:57, Chesnay Schepler <[hidden email]> wrote: > > 2. isn't quite accurate actually; metrics on the TaskManager are not persisted across restarts. > > On 19.05.2017 11:21, Chesnay Schepler wrote: >> 1. This shouldn't happen. Do you access the counter from different threads? >> >> 2. Metrics in general are not persisted across restarts, and there is no way to configure flink to do so at the moment. >> >> 3. Counters are sent as gauges since as far as I know StatsD counters are not allowed to be decremented. >> >> On 19.05.2017 08:56, jaxbihani wrote: >>> Background: We are using a job using ProcessFunction which reads data from >>> kafka fires ~5-10K timers per second and sends matched events to KafkaSink. >>> We are collecting metrics for collecting no of active timers, no of timers >>> scheduled etc. We use statsd reporter and monitor using Grafana dashboard & >>> RocksDBStateBackend backed by HDFS as state. >>> >>> Observations/Problems: >>> 1. *Counter value suddenly got reset:* While job was running fine, on one >>> fine moment, metric of a monotonically increasing counter (Counter where we >>> just used inc() operation) suddenly became 0 and then resumed from there >>> onwards. Only exception in the logs were related to transient connectivity >>> issues to datanodes. Also there was no other indicator of any failure >>> observed after inspecting system metrics/checkpoint metrics. It happened >>> just once across multiple runs of a same job. >>> 2. *Counters not retained during flink restart with savepoint*: Cancelled >>> job with -s option taking savepoint and then restarted the job using the >>> savepoint. After restart metrics started from 0. I was expecting metric >>> value of a given operator would also be part of state. >>> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting >>> the format in which metric are sent to statsd. I observed that even the >>> metric which in my code were counters, were sent as gauges. I didn't get why >>> that was so. >>> >>> Can anyone please add more insights into why above mentioned behaviors would >>> have happened? >>> Also does flink store metric values as a part of state for stateful >>> operators? Is there any way to configure that? >>> >>> >>> >>> >>> -- >>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html >>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. >>> >> >> > |
Yes, that could cause the observed issue.
The default implementations are not thread-safe; if you do concurrent writes they may be lost/overwritten. You will have to either guard accesses to that metric with a synchronized block or implement your own thread-safe counter. On 22.05.2017 14:17, Aljoscha Krettek wrote: > @Chesnay With timers it will happen that onTimer() is called from a different Thread than the Tread that is calling processElement(). If Metrics updates happen in both, would that be a problem? > >> On 19. May 2017, at 11:57, Chesnay Schepler <[hidden email]> wrote: >> >> 2. isn't quite accurate actually; metrics on the TaskManager are not persisted across restarts. >> >> On 19.05.2017 11:21, Chesnay Schepler wrote: >>> 1. This shouldn't happen. Do you access the counter from different threads? >>> >>> 2. Metrics in general are not persisted across restarts, and there is no way to configure flink to do so at the moment. >>> >>> 3. Counters are sent as gauges since as far as I know StatsD counters are not allowed to be decremented. >>> >>> On 19.05.2017 08:56, jaxbihani wrote: >>>> Background: We are using a job using ProcessFunction which reads data from >>>> kafka fires ~5-10K timers per second and sends matched events to KafkaSink. >>>> We are collecting metrics for collecting no of active timers, no of timers >>>> scheduled etc. We use statsd reporter and monitor using Grafana dashboard & >>>> RocksDBStateBackend backed by HDFS as state. >>>> >>>> Observations/Problems: >>>> 1. *Counter value suddenly got reset:* While job was running fine, on one >>>> fine moment, metric of a monotonically increasing counter (Counter where we >>>> just used inc() operation) suddenly became 0 and then resumed from there >>>> onwards. Only exception in the logs were related to transient connectivity >>>> issues to datanodes. Also there was no other indicator of any failure >>>> observed after inspecting system metrics/checkpoint metrics. It happened >>>> just once across multiple runs of a same job. >>>> 2. *Counters not retained during flink restart with savepoint*: Cancelled >>>> job with -s option taking savepoint and then restarted the job using the >>>> savepoint. After restart metrics started from 0. I was expecting metric >>>> value of a given operator would also be part of state. >>>> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting >>>> the format in which metric are sent to statsd. I observed that even the >>>> metric which in my code were counters, were sent as gauges. I didn't get why >>>> that was so. >>>> >>>> Can anyone please add more insights into why above mentioned behaviors would >>>> have happened? >>>> Also does flink store metric values as a part of state for stateful >>>> operators? Is there any way to configure that? >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html >>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. >>>> >>> > |
Ah ok, the onTimer() and processElement() methods are all protected by synchronized blocks on the same lock. So that shouldn’t be a problem.
> On 22. May 2017, at 15:08, Chesnay Schepler <[hidden email]> wrote: > > Yes, that could cause the observed issue. > > The default implementations are not thread-safe; if you do concurrent writes they may be lost/overwritten. > You will have to either guard accesses to that metric with a synchronized block or implement your own thread-safe counter. > > On 22.05.2017 14:17, Aljoscha Krettek wrote: >> @Chesnay With timers it will happen that onTimer() is called from a different Thread than the Tread that is calling processElement(). If Metrics updates happen in both, would that be a problem? >> >>> On 19. May 2017, at 11:57, Chesnay Schepler <[hidden email]> wrote: >>> >>> 2. isn't quite accurate actually; metrics on the TaskManager are not persisted across restarts. >>> >>> On 19.05.2017 11:21, Chesnay Schepler wrote: >>>> 1. This shouldn't happen. Do you access the counter from different threads? >>>> >>>> 2. Metrics in general are not persisted across restarts, and there is no way to configure flink to do so at the moment. >>>> >>>> 3. Counters are sent as gauges since as far as I know StatsD counters are not allowed to be decremented. >>>> >>>> On 19.05.2017 08:56, jaxbihani wrote: >>>>> Background: We are using a job using ProcessFunction which reads data from >>>>> kafka fires ~5-10K timers per second and sends matched events to KafkaSink. >>>>> We are collecting metrics for collecting no of active timers, no of timers >>>>> scheduled etc. We use statsd reporter and monitor using Grafana dashboard & >>>>> RocksDBStateBackend backed by HDFS as state. >>>>> >>>>> Observations/Problems: >>>>> 1. *Counter value suddenly got reset:* While job was running fine, on one >>>>> fine moment, metric of a monotonically increasing counter (Counter where we >>>>> just used inc() operation) suddenly became 0 and then resumed from there >>>>> onwards. Only exception in the logs were related to transient connectivity >>>>> issues to datanodes. Also there was no other indicator of any failure >>>>> observed after inspecting system metrics/checkpoint metrics. It happened >>>>> just once across multiple runs of a same job. >>>>> 2. *Counters not retained during flink restart with savepoint*: Cancelled >>>>> job with -s option taking savepoint and then restarted the job using the >>>>> savepoint. After restart metrics started from 0. I was expecting metric >>>>> value of a given operator would also be part of state. >>>>> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting >>>>> the format in which metric are sent to statsd. I observed that even the >>>>> metric which in my code were counters, were sent as gauges. I didn't get why >>>>> that was so. >>>>> >>>>> Can anyone please add more insights into why above mentioned behaviors would >>>>> have happened? >>>>> Also does flink store metric values as a part of state for stateful >>>>> operators? Is there any way to configure that? >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html >>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. >>>>> >>>> >> > |
Free forum by Nabble | Edit this page |