Is it possible that checkpointing times out due to an operator taking too long? Also, does windowing affect the checkpoint barriers? |
1) An operator that just blocks for a long time (for example, because it
does a synchronous call to some external service) can indeed cause a checkpoint timeout. 2) What kind of effects are you worried about? On 1/28/2021 8:05 PM, Marco Villalobos wrote: > Is it possible that checkpointing times out due to an operator taking > too long? > > Also, does windowing affect the checkpoint barriers? |
Actually, perhaps I misworded it. This particular checkpoint seems to occur in an operator that is flat mapping (it is actually a keyed processing function) a single blob data-structure into several hundred thousands elements (sometimes a million) that immediately flow into a sink. I am speculating that the sink writes to the database were taking too long and causing a checkpoint to fail, but I changed that sink into a print, and the checkpoint still failed, so it must be something else. I don't know deep details regarding Flinks internals, but I am speculating that the data between this operator and sink has to be checkpointed before the sink actually does something. On Mon, Feb 1, 2021 at 2:37 AM Chesnay Schepler <[hidden email]> wrote: 1) An operator that just blocks for a long time (for example, because it |
By default, a checkpoint times out after 10 minutes. This means if not all operators are able to confirm the checkpoint, it will be cancelled. If you have an operator that is blocking for more than 10 minutes on a single record (because this record contains millions of elements that are written to an external system), then yes, this operator can cause your checkpoints to time out. On Mon, Feb 1, 2021 at 5:26 PM Marco Villalobos <[hidden email]> wrote:
|
I've seen checkpoints timeout when using the RocksDB state backend with very large objects. The issue is that updating a ValueState<T> stored in RocksDB requires deserializing, updating, and then re-serializing that object -- and if that's some enormous collection type, that will be slow. In such cases it's much better to use ListState or MapState, if possible, or the filesystem state backend -- but the filesystem state backend will have to copy those objects during checkpointing, and will need plenty of memory. Checkpoint barriers are not held up by windows. When the barrier reaches the head of the input queue, a snapshot is taken of the window's current state, and the barrier is forwarded downstream. On Fri, Feb 5, 2021 at 12:17 PM Robert Metzger <[hidden email]> wrote:
|
Hi Marco, Actually, perhaps I misworded it. This particular checkpoint seems to occur in an operator that is flat mapping (it is actually a keyed processing function) a single blob data-structure into several hundred thousands elements (sometimes a million) that immediately flow into a sink. I am speculating that the sink writes to the database were taking too long and causing a checkpoint to fail, but I changed that sink into a print, and the checkpoint still failed, so it must be something else. you actually hit one of the design flaws of checkpoints. The barrier can only be interleaved between records. So in your case it would be something like <input1>, <barrier>, <input2>. Then the barrier has to wait until input1 has been fully processed, which may take very long in your case.
No data is being added to the checkpoint (unless you use unaligned checkpoints, but note that even unaligned checkpoints don't help here). The only easy solution is to increase checkpointing timeout and live with long checkpointing times. A bit more elaborate solution would be to split the flatMap in 2 parts (if this is possible), such that instead of amplifying the record by a factor of 1M, you have two amplification steps of 1k. Then you can use unaligned checkpoints to have quicker checkpoints as each flatMap can be checkpointed when these 1k records have been generated. (please wait for the upcoming 1.12.2). An example would be the following: suppose you have a record with a list with 1M entries. Instead of flatten it into 1M records directly, you first create chunks with a size of 1k and then in a second step flatten these chunks. So the pipeline is source -> flatMap to chunk -> forward channel -> flatMap -> sink. Footnote: We often thought about allowing the user to kind of suspend in the flatMap to allow things like barrier processing to happen but it's unfortunately not so easy as snapshotting the operator state in this case is prone to fail because the user has to make sure to not hold any state-relevant data while suspending - that's hard to explain. On Fri, Feb 5, 2021 at 2:31 PM David Anderson <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |