Re: flink operator's latency metrics continues to increase.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: flink operator's latency metrics continues to increase.

Till Rohrmann
Hi Suxing Lee,

thanks for reaching out to me. I forward this mail also to the user mailing list because it could be interesting for others as well.

Your observation could indeed be an indicator for a problem with the latency metric. I quickly checked the code and on the first glance it looks right to me that we increase the nextTimestamp field by period in RepeatedTriggerTask because we schedule this task at a fixed rate in SystemProcessingTimeService#scheduleAtFixedRate. Internally this method calls ScheduledThreadPoolExecutor#scheduleAtFixedRate which uses System.nanoTime to schedule tasks repeatedly. In fact, the same logic will be used by the ScheduledThreadPoolExecutor#ScheduledFutureTask. If a GC pause or another stop the world event happens, this should only affect one latency metric and not all (given that System.nanoTime continues to increase) because the next will be scheduled faster since System.nanoTime might have progressed more.

What could be a problem is that we compute the latency by System.currentTimeMillis - marker.getMarkedTime. I think there is no guarantee that System.currentTimeMillis and System.nanoTime don't drift apart. Especially if they are executed on different machines. This is something which we could check.

This link [1] explains the drift problem a bit more in detail.

In any case, I would suggest to open a JIRA issue to report this problem.


Cheers,
Till

On Mon, Dec 17, 2018 at 2:37 PM Suxing Lee <[hidden email]> wrote:
Hi Till Rohrmann,

I was running flink 1.5.5,  and I use prometheus to collect metrics to check latency of my jobs.
But sometimes I observerd that  the operator's latency metrics continues to increase in my job.
The operator's latency time is increased by approximately 2.7 minutes per day (please see the attached screenshots)

my job's logic is simple,just distribute data from kafkaSource to bucketingSink.
so  I check the consumer offsets in kafka for  consumer group, I also check the latest data in hdfs . in fact, there is no serious latency in my job.

I notice that the statistical method of latency is currentTimeMillis minus LatencyMarker's markedTime.
but LatencyMarker's timestamp come from RepeatedTriggerTask's nextTimestamp which compute timestamp by plus a period(default  value is 2s before v1.5.5),the nextTimestamp will be delay when JVM GC or linux preemptive scheduling happened. as time increases,the nextTimestamp is much later than the current time ( I had verify this result via  the JVM Heap Dump).

we can avoid the above situation by directly using linux's NTP to guarantee accuracy,not need to compute timestamp by process.
I'm not very familiar with  SystemProcessingTimeService. Is there some detail I have not think about? 


Best regards and thanks for your help.
Suxing Lee