Question about checkpointing with stateful operators and state recovery

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about checkpointing with stateful operators and state recovery

Federico D'Ambrosio
Hi, I've got a couple of questions concerning the topics in the subject:

    1. If an operator is getting applied on a keyed stream, do I still have to implement the CheckpointedFunction trait and define the snapshotState and initializeState methods, in order to successfully recover the state from a job failure?
   
    2. While using a FlinkKafkaConsumer, enabling checkpointing allows exactly once semantics end to end, provided that the sink is able to guarantee the same. Do I have to set
setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly once semantics in a sink?

    3. What are the advantages of externalized checkpoints and which are the cases where I would want to use them?
 
    4. Let's suppose a scenario where: checkpointing is enabled every 10 seconds, I have a kafka consumer which is set to start from the latest records, a sink providing at least once semantics and a stateful keyed operator inbetween the consumer and the sink. Is it correct that, in case of task failure, happens the following?
        - the kafka consumer gets reverted to the latest offset (does it happen even if I don't set setCommitOffsetsOnCheckpoints(true)?)
        - the operator state gets reverted to the latest checkpoint
        - the sink is stateless so it doesn't really care about what happened
        - the stream restarts and probably some of the events coming to the sink have already been     processed before

Thank you for attention,
Kind regards,
Federico
Reply | Threaded
Open this post in threaded view
|

Re: Question about checkpointing with stateful operators and state recovery

Aljoscha Krettek
Hi Frederico,

I'll try and give some answers:

1. Generally speaking, no. If you use keyed state, for example via RuntimeContext you don't need to implement CheckpointedFunction.

2. You don't have to set setCommitOffsetsOnCheckpoints(true), this only affects how offsets are committed to Kafka in case other systems want to check that offset. To get exactly once semantics you have two general paths: 1) your sink is idempotent, meaning it doesn't matter whether you write output multiple times 2) the sink has to be integrated with Flink checkpointing and transactions. 2) was not easily possible for Kafka until Kafka 0.11 introduced transaction support. Flink 1.4 will have a Kafka 0.11 producer that supports transactions so with that you can have end-to-end exactly once.

3. The advantage of externalised checkpoints is that they don't get deleted when you cancel a job. This is different from regular checkpoints, which get deleted when you manually cancel a job. There are plans to make all checkpoints "externalised" in Flink 1.4.

4. Yes, you are correct. :-)

Best,
Aljoscha

> On 28. Sep 2017, at 11:46, Federico D'Ambrosio <[hidden email]> wrote:
>
> Hi, I've got a couple of questions concerning the topics in the subject:
>
>     1. If an operator is getting applied on a keyed stream, do I still have to implement the CheckpointedFunction trait and define the snapshotState and initializeState methods, in order to successfully recover the state from a job failure?
>    
>     2. While using a FlinkKafkaConsumer, enabling checkpointing allows exactly once semantics end to end, provided that the sink is able to guarantee the same. Do I have to set
> setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly once semantics in a sink?
>
>     3. What are the advantages of externalized checkpoints and which are the cases where I would want to use them?
>  
>     4. Let's suppose a scenario where: checkpointing is enabled every 10 seconds, I have a kafka consumer which is set to start from the latest records, a sink providing at least once semantics and a stateful keyed operator inbetween the consumer and the sink. Is it correct that, in case of task failure, happens the following?
>         - the kafka consumer gets reverted to the latest offset (does it happen even if I don't set setCommitOffsetsOnCheckpoints(true)?)
>         - the operator state gets reverted to the latest checkpoint
>         - the sink is stateless so it doesn't really care about what happened
>         - the stream restarts and probably some of the events coming to the sink have already been     processed before
>
> Thank you for attention,
> Kind regards,
> Federico