Am 28.09.2017 um 11:25 schrieb Tony Wei <[hidden email]>:Hi Stefan,These are some telemetry information, but I don't have history information about gc.<???? 2017-09-2 8 下午4.51.26.png><???? 2017-09-2 8 下午4.51.11.png>1) Yes, my state is not large.2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since this is a POC, we might move to AWS in the future or use HDFS in the same cluster. However, how can I recognize the problem is this.3) It seems memory usage is bounded. I'm not sure if the status showed above is fine.There is only one TM in my cluster for now, so all tasks are running on that machine. I think that means they are in the same JVM, right?Besides taking so long on asynchronous part, there is another question is that the late message showed that this task was delay for almost 7 minutes, but the log showed it only took 4 minutes.It seems that it was somehow waiting for being executed. Are there some points to find out what happened?For the log information, what I means is it is hard to recognize which checkpoint id that asynchronous parts belong to if the checkpoint takes more time and there are more concurrent checkpoints taking place.Also, it seems that asynchronous part might be executed right away if there is no resource from thread pool. It is better to measure the time between creation time and processing time, and log it and checkpoint id with the original log that showed what time the asynchronous part took.Best Regards,Tony Wei2017-09-28 16:25 GMT+08:00 Stefan Richter <[hidden email]>:Hi,when the async part takes that long I would have 3 things to look at:1) Is your state so large? I don’t think this applies in your case, right?2) Is something wrong with writing to DFS (network, disks, etc)?3) Are we running low on memory on that task manager?Do you have telemetry information about used heap and gc pressure on the problematic task? However, what speaks against the memory problem hypothesis is that future checkpoints seem to go through again. What I find very strange is that within the reported 4 minutes of the async part the only thing that happens is: open dfs output stream, iterate the in-memory state and write serialized state data to dfs stream, then close the stream. No locks or waits in that section, so I would assume that for one of the three reasons I gave, writing the state is terribly slow.Those snapshots should be able to run concurrently, for example so that users can also take savepoints even when a checkpoint was triggered and is still running, so there is no way to guarantee that the previous parts have finished, this is expected behaviour. Which waiting times are you missing in the log? I think the information about when a checkpoint is triggered, received by the TM, performing the sync and async part and acknowledgement time should all be there?.Best,StefanAm 28.09.2017 um 08:18 schrieb Tony Wei <[hidden email]>:Hi Stefan,The checkpoint on my job has been subsumed again. There are some questions that I don't understand.Log in JM :2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1576 (174693180 bytes in 21597 ms). 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1577 @ 1506520182795 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1578 @ 1506520482795 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1578 (152621410 bytes in 19109 ms). 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1577 from 2273da50f29b9dee731f7bd749e91c 80 of job 7c039572b.... 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1579 @ 1506520782795 Log in TM:2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://tony-dev/flink-checkpoint s/7c039572b13346f1b17dcc0ace2b 72c2, asynchronous part) in thread Thread[pool-7-thread-322,5,Fli nk Task Threads] took 240248 ms. I think the log in TM might be the late message for #1577 in JM, because #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.If there is no mistake on my words, I am wondering why the time it took was 240248 ms (4 min). It seems that it started late than asynchronous tasks in #1578.Is there any way to guarantee the previous asynchronous parts of checkpoints will be executed before the following.Moreover, I think it will be better to have more information in INFO log, such as waiting time and checkpoint id, in order to trace the progress of checkpoint conveniently.What do you think? Do you have any suggestion for me to deal with these problems? Thank you.Best Regards,Tony Wei2017-09-27 17:11 GMT+08:00 Tony Wei <[hidden email]>:Hi Stefan,Here is the summary for my streaming job's checkpoint after restarting at last night.<???? 2017-09-2 7 下午4.56.30.png>This is the distribution of alignment buffered from the last 12 hours.<???? 2017-09-2 7 下午5.05.11.png>And here is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 and #1246, you can check the picture I sent before.<???? 2017-09-2 7 下午5.01.24.png>AFAIK, the back pressure rate usually is in LOW status, sometimes goes up to HIGH, and always OK during the night.Best Regards,Tony Wei2017-09-27 16:54 GMT+08:00 Stefan Richter <[hidden email]>:Hi Tony,are your checkpoints typically close to the timeout boundary? From what I see, writing the checkpoint is relatively fast but the time from the checkpoint trigger to execution seems very long. This is typically the case if your job has a lot of backpressure and therefore the checkpoint barriers take a long time to travel to the operators, because a lot of events are piling up in the buffers. Do you also experience large alignments for your checkpoints?Best,StefanAm 27.09.2017 um 10:43 schrieb Tony Wei <[hidden email]>:Hi Stefan,It seems that I found something strange from JM's log.It had happened more than once before, but all subtasks would finish their checkpoint attempts in the end.2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1140 @ 1506389008690 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1141 @ 1506389308690 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1142 @ 1506389608690 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 expired before completing. 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 expired before completing. 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1140 from c63825d15de0fef55a1d148adcf446 7e of job 7c039572b... 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1141 from c63825d15de0fef55a1d148adcf446 7e of job 7c039572b... 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1142 (136733704 bytes in 457413 ms). For chk #1245 and #1246, there was no late message from TM. You can refer to the TM log. The full completed checkpoint attempt will have 12 (... asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1245 @ 1506420508690 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1246 @ 1506420808690 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 expired before completing. 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 expired before completing. Moreover, I listed the directory for checkpoints on S3 and saw there were two states not discarded successfully. In general, there will be 16 parts for a completed checkpoint state.2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/ chk-1245/eedd7ca5-ee34-45a5-bf 0b-11cc1fc67ab8 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/ chk-1246/9aa5c6c4-8c74-465d-85 09-5fea4ed25af6 Hope these informations are helpful. Thank you.Best Regards,Tony Wei2017-09-27 16:14 GMT+08:00 Stefan Richter <[hidden email]>:Hi,thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.Best,StefanAm 26.09.2017 um 17:55 schrieb Tony Wei <[hidden email]>:Hi Stefan,There is no unknown exception in my full log. The Flink version is 1.3.2.My job is roughly like this.env.addSource(Kafka).map(ParseKeyFromRecord).keyBy().process(CountAndTimeoutWindow) .asyncIO(UploadToS3).addSink(UpdateDatabase)It seemed all tasks stopped like the picture I sent in the last email.I will keep my eye on taking a thread dump from that JVM if this happens again.Best Regards,Tony Wei2017-09-26 23:46 GMT+08:00 Stefan Richter <[hidden email]>:Hi,
that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
Best,
Stefan
> Am 26.09.2017 um 17:08 schrieb Tony Wei <[hidden email]>:
>
> Hi,
>
> Something weird happened on my streaming job.
>
> I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
>
> <snapshot.png>
>
> I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
>
> However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
>
> The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
>
> My cluster setting is one JM and one TM with 4 available slots.
>
> Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
>
> Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
>
> Best Regards,
> Tony Wei
> <flink-root-taskmanager-0-partial.log>
Free forum by Nabble | Edit this page |