Hi,
While restoring from the latest checkpoint starts immediately after the job is restarted, restoring from a savepoint takes more than five minutes until the job makes progress. During the blackout, I cannot observe any resource usage over the cluster. After that period of time, I observe that Flink tries to catch up with the progress in the source topic via various metrics including flink_taskmanager_job_task_currentLowWatermark. FYI, I'm using - Flink-1.4.2 - FsStateBackend configured with HDFS - EventTime with BoundedOutOfOrdernessTimestampExtractor The size of an instance of checkpoint/savepoint is ~50GB and we have 7 servers for taskmanagers. Best, - Dongwon |
Attached is a log file from a taskmanager.
Please take a look at the log file considering the below events: - Around 01:10:47 : the job is submitted to the job manager. - Around 01:16:30 : suddenly source starts to read from and sink starts to write data to Kafka Any help would be greatly appreciated! T.T Best, - Dongwon > 2018. 4. 2. 오후 2:30, Dongwon Kim <[hidden email]> 작성: > > Hi, > > While restoring from the latest checkpoint starts immediately after the job is restarted, restoring from a savepoint takes more than five minutes until the job makes progress. > During the blackout, I cannot observe any resource usage over the cluster. > After that period of time, I observe that Flink tries to catch up with the progress in the source topic via various metrics including flink_taskmanager_job_task_currentLowWatermark. > > FYI, I'm using > - Flink-1.4.2 > - FsStateBackend configured with HDFS > - EventTime with BoundedOutOfOrdernessTimestampExtractor > > The size of an instance of checkpoint/savepoint is ~50GB and we have 7 servers for taskmanagers. > > Best, > > - Dongwon tm.log (70K) Download Attachment |
It was due to too low parallelism.
I increase parallelism large enough (actually set it to the total number of task slots on the cluster) and it makes restore from a savepoint much faster. This is somewhat related to the previous discussion I had with Robert and Aljoscha. Having a standalone cluster consisting of 7 taskmanagers, I wanted to schedule session window tasks (each of which holds large state in memory) evenly over 7 taskmanagers. To that end, I had to set the number of tasks for session window to 7 as a single core seems enough for the simple computation logic of our application. Luckily enough 7 is smaller than ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER which is hard-coded to 8, so 7 tasks happen to be evenly scheduled on 7 taskmanagers. However, I found that, when restoring from a savepoint, each of HDFS clients, which are 7 session window tasks, reads a large file (operator state) from HDFS for a long period of time at 4MB per second. The slow speed seems to result from a single thread reads and de-seriealize each state entry from state stored on HDFS. So I use up the total number of task slots for a single streaming job on the standalone cluster. Note that, if I set the number of session window tasks to somewhere between 7 and the total number of task slots, tasks are scheduled on few taskmanagers and the taskmanagers are dead due to lack of memory. And I do not have SSDs so I prefer FsStateBackend over RocksDBStateBackend. Of course the standalone cluster cannot be shared across multiple jobs as we don't have free slots anymore. As it seems like GA release of flink-1.5.0 is around the corner, I divert my attention to Mesos and Flip-6 for per-job clusters. One concern is that multiple taskmanagers can be scheduled on the same node on Mesos AFAIK. @Eron Is it still a not-yet-solved issue? If so, do you think it requires a lot of work to add Fenzo's uniqueness constraint to Flink's Mesos ResourceManager? I want to open an issue and figure it out (hopefully with your kind advice). p.s. this time my Flink application has nothing to do with GPUs. Best, - Dongwon > 2018. 4. 2. 오후 3:33, Dongwon Kim <[hidden email]> 작성: > > Attached is a log file from a taskmanager. > Please take a look at the log file considering the below events: > - Around 01:10:47 : the job is submitted to the job manager. > - Around 01:16:30 : suddenly source starts to read from and sink starts to write data to Kafka > > Any help would be greatly appreciated! T.T > > Best, > - Dongwon > > <tm.log> > >> 2018. 4. 2. 오후 2:30, Dongwon Kim <[hidden email]> 작성: >> >> Hi, >> >> While restoring from the latest checkpoint starts immediately after the job is restarted, restoring from a savepoint takes more than five minutes until the job makes progress. >> During the blackout, I cannot observe any resource usage over the cluster. >> After that period of time, I observe that Flink tries to catch up with the progress in the source topic via various metrics including flink_taskmanager_job_task_currentLowWatermark. >> >> FYI, I'm using >> - Flink-1.4.2 >> - FsStateBackend configured with HDFS >> - EventTime with BoundedOutOfOrdernessTimestampExtractor >> >> The size of an instance of checkpoint/savepoint is ~50GB and we have 7 servers for taskmanagers. >> >> Best, >> >> - Dongwon > |
Free forum by Nabble | Edit this page |