Hi Averell, One comment regarding what you said: > As my files are small, I think there would not be much benefit in checkpointing file offset state. Checkpointing is not about efficiency but about consistency. If the position in a split is not checkpointed, your application won't operate with exactly-once state consistency unless each split produces exactly one record. Best, Fabian 2018-08-10 9:10 GMT+02:00 Jörn Franke <[hidden email]>:
|
Thank you Vino, Jorn, and Fabian.
Please forgive me for my ignorant, as I am still not able to fully understand state/checkpointing and the statement that Fabian gave earlier: "/In either case, some record will be read twice but if reading position can be reset, you can still have exactly-once state consistency because the state is reset as well./" My current understanding is: checkpointing is managed at the Execution-Environment level, and it would happen at the same time at all the operators of the pipeline. Is this true? My concern here is how to manage that synchronization? It would be quite possible that at different operators, checkpointing happens at some milliseconds apart, which would lead to duplicated or missed records, wouldn't it? I tried to read Flink's document about managing State here <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html> . However, I have not been able to find the information I am looking for. Please help point me to the right place. Thanks and best regards, Averell. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, Conceptually, you are right. Checkpoints are taken at every operator at the same "logical" time. It is not important, that each operator checkpoints at the same wallclock time. Instead, the need to take a checkpoint when they have processed the same input. This is implemented with so-called Checkpoint Barriers, which are special records that are injected at the sources. [Simplification] When an operator receives a barrier it performs a checkpoint. [/Simplification] This way, we do not need to pause the processing of all operators but can perform the checkpoints locally for each operator. This page of the Internal docs should help to understand how the mechanism works in detail [1]. Best, Fabian 2018-08-10 14:43 GMT+02:00 Averell <[hidden email]>: Thank you Vino, Jorn, and Fabian. |
Thank you Fabian.
It is clear to me now. Thanks a lot for your help. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Fabian Hueske-2
Hello Fabian, and all,
Please excuse me for digging this old thread up. I have a question regarding sending of the "barrier" messages in Flink's checkpointing mechanism: I want to know when those barrier messages are sent when I am using a file source. Where can I find it in the source code? I'm still with my 20,000 small files issue, when I have all those 20000 files appear to the ContinuousFileMonitorfingFunction at the same time. It is taking only a few seconds to list all those files, but it is expected to take about 5 minutes have those 20K files processed till my sink. Due to some resources limitation issue, my job fails after about 3 minutes. And what is happening after that is the job crashes, gets restored, tries to process all 20K files from file 1 again, and ultimately fails again after 3 minutes,... It goes into an indefinite loop. I think that this is the expected behaviour, as my current checkpoint config is to checkpoint every 10s, and it took only a second or two for the listing of those 20K files. Am I correct here? And do we have a solution for this? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, Barriers are injected into the regular data flow by source functions. In case of a file monitoring source, the barriers are injected into the stream of file splits that are passed to the ContinuousFileMonitoringFunction. The CFMF puts the splits into a queue and processes them with a dedicated split reader thread. All state modifying operations of the thread (emitting a record, opening a new split, etc.) are guarded by a checkpoint lock. When the CFMF receives a barrier, the checkpointing logic requests the lock and forces the split reader thread to pause. Then it requests the current state of the thread and writes it into its checkpoint. In order to be able to properly checkpoint the state of the reading thread within a split, the InputFormat that is used to read the files must implement the CheckpointableInputFormat interface. Otherwise, a split will be read from the start. Best, Fabian Am Mo., 27. Aug. 2018 um 10:55 Uhr schrieb Averell <[hidden email]>: Hello Fabian, and all, |
Hello Fabian,
Thanks for the answer. However, my question is a little bit different. Let me rephrase my example and my question: * I have 10,000 unsplittable small files to read, which, in total, has about 10M output lines. * From Flink's reporting web GUI, I can see that CFMF and ContinuousFileReaderOperator (CFRO) are reported separately. - CFMF needs about 10 seconds to generate all 10,000 records (as you said, in this case, 1 record = 1 file split). - CFRO generates about 2M records per minute (which means CFRO is processing at the rate of 2,000 files per minute) * I set the checkpointing interval = 1 minute. In this example, /will the 1st barrier be injected into the stream of file-splits 50 seconds after the 10,000th split, or after the 2,000th one?/ Sorry for being confusing. Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, CMCF is not a source, only the file monitoring function is. Barriers are injected by the FMF when the JM sends a checkpoint message. The barriers then travel to the CMCF and trigger the Checkpoint ING. Fabian Averell <[hidden email]> schrieb am Di., 28. Aug. 2018, 12:02: Hello Fabian, |
Thank you Fabian.
Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |