FLINK Kinesis consumer Checkpointing data loss

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK Kinesis consumer Checkpointing data loss

Vijayendra Yadav
Hi Team,

We are trying to make sure we are not losing data when KINESIS Consumer is down.

Kinesis streaming Job which has following checkpointing properties:

// checkpoint every X msecs
        env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
// enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
// allow job recovery fallback to checkpoint when there is a more recent savepoint
   env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
 // enables the experimental unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();
//checkpointpath
        env.setStateBackend(new FsStateBackend(Conf.getFlinkCheckPointPath(), true));

1) We killed the Kinesis Job
2) Sent messages to KDS while Consumer was down.
3) Restarted Flink Consumer, messages which were sent during the Consumer down period, never ingested (data loss).
4) Re-sent messages to KDS while the consumer was still up. Messages did ingest fine.

How can I avoid data loss for #3 ??

From Logs:

2021-04-07 12:15:49,161 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: File State Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: -1)

2021-04-07 12:16:02,343 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 ms).
2021-04-07 12:16:11,951 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 8943d16e22b8aaf65d6b9e2b8bd54113.
2021-04-07 12:16:12,483 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 ms).

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis consumer Checkpointing data loss

Arvid Heise-4
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a checkpoint/savepoint? If you just start the application anew and use LATEST initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use unaligned checkpoints? (I don't really think this is because of unaligned checkpoint, but it's better to be sure and we want to reduce the possible error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

We are trying to make sure we are not losing data when KINESIS Consumer is down.

Kinesis streaming Job which has following checkpointing properties:

// checkpoint every X msecs
        env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
// enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
// allow job recovery fallback to checkpoint when there is a more recent savepoint
   env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
 // enables the experimental unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();
//checkpointpath
        env.setStateBackend(new FsStateBackend(Conf.getFlinkCheckPointPath(), true));

1) We killed the Kinesis Job
2) Sent messages to KDS while Consumer was down.
3) Restarted Flink Consumer, messages which were sent during the Consumer down period, never ingested (data loss).
4) Re-sent messages to KDS while the consumer was still up. Messages did ingest fine.

How can I avoid data loss for #3 ??

From Logs:

2021-04-07 12:15:49,161 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: File State Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: -1)

2021-04-07 12:16:02,343 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 ms).
2021-04-07 12:16:11,951 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 8943d16e22b8aaf65d6b9e2b8bd54113.
2021-04-07 12:16:12,483 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 ms).

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis consumer Checkpointing data loss

Vijayendra Yadav
Hi Arvid,

Thanks for your response. I did not restart from the checkpoint. I assumed Flink would look for a checkpoint upon restart automatically.

I should restart like below ?

bin/flink run  -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \

Thanks,
Vijay

On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <[hidden email]> wrote:
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a checkpoint/savepoint? If you just start the application anew and use LATEST initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use unaligned checkpoints? (I don't really think this is because of unaligned checkpoint, but it's better to be sure and we want to reduce the possible error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

We are trying to make sure we are not losing data when KINESIS Consumer is down.

Kinesis streaming Job which has following checkpointing properties:

// checkpoint every X msecs
        env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
// enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
// allow job recovery fallback to checkpoint when there is a more recent savepoint
   env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
 // enables the experimental unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();
//checkpointpath
        env.setStateBackend(new FsStateBackend(Conf.getFlinkCheckPointPath(), true));

1) We killed the Kinesis Job
2) Sent messages to KDS while Consumer was down.
3) Restarted Flink Consumer, messages which were sent during the Consumer down period, never ingested (data loss).
4) Re-sent messages to KDS while the consumer was still up. Messages did ingest fine.

How can I avoid data loss for #3 ??

From Logs:

2021-04-07 12:15:49,161 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: File State Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: -1)

2021-04-07 12:16:02,343 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 ms).
2021-04-07 12:16:11,951 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 8943d16e22b8aaf65d6b9e2b8bd54113.
2021-04-07 12:16:12,483 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 ms).

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis consumer Checkpointing data loss

Vijayendra Yadav
Thanks it was working fine with: bin/flink run  -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \

On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Arvid,

Thanks for your response. I did not restart from the checkpoint. I assumed Flink would look for a checkpoint upon restart automatically.

I should restart like below ?

bin/flink run  -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \

Thanks,
Vijay

On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <[hidden email]> wrote:
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a checkpoint/savepoint? If you just start the application anew and use LATEST initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use unaligned checkpoints? (I don't really think this is because of unaligned checkpoint, but it's better to be sure and we want to reduce the possible error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

We are trying to make sure we are not losing data when KINESIS Consumer is down.

Kinesis streaming Job which has following checkpointing properties:

// checkpoint every X msecs
        env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
// enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
// allow job recovery fallback to checkpoint when there is a more recent savepoint
   env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
 // enables the experimental unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();
//checkpointpath
        env.setStateBackend(new FsStateBackend(Conf.getFlinkCheckPointPath(), true));

1) We killed the Kinesis Job
2) Sent messages to KDS while Consumer was down.
3) Restarted Flink Consumer, messages which were sent during the Consumer down period, never ingested (data loss).
4) Re-sent messages to KDS while the consumer was still up. Messages did ingest fine.

How can I avoid data loss for #3 ??

From Logs:

2021-04-07 12:15:49,161 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: File State Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: -1)

2021-04-07 12:16:02,343 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 ms).
2021-04-07 12:16:11,951 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 8943d16e22b8aaf65d6b9e2b8bd54113.
2021-04-07 12:16:12,483 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 ms).

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis consumer Checkpointing data loss

Arvid Heise-4
Hi Vijay,

if you don't specify a checkpoint, then Flink assumes you want to start from scratch (e.g., you had a bug in your business logic and need to start completely without state).

If there is any failure and Flink restarts automatically, it will always pick up from the latest checkpoint [1].


On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav <[hidden email]> wrote:
Thanks it was working fine with: bin/flink run  -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \

On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Arvid,

Thanks for your response. I did not restart from the checkpoint. I assumed Flink would look for a checkpoint upon restart automatically.

I should restart like below ?

bin/flink run  -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \

Thanks,
Vijay

On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <[hidden email]> wrote:
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a checkpoint/savepoint? If you just start the application anew and use LATEST initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use unaligned checkpoints? (I don't really think this is because of unaligned checkpoint, but it's better to be sure and we want to reduce the possible error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

We are trying to make sure we are not losing data when KINESIS Consumer is down.

Kinesis streaming Job which has following checkpointing properties:

// checkpoint every X msecs
        env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
// enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
// allow job recovery fallback to checkpoint when there is a more recent savepoint
   env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
 // enables the experimental unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();
//checkpointpath
        env.setStateBackend(new FsStateBackend(Conf.getFlinkCheckPointPath(), true));

1) We killed the Kinesis Job
2) Sent messages to KDS while Consumer was down.
3) Restarted Flink Consumer, messages which were sent during the Consumer down period, never ingested (data loss).
4) Re-sent messages to KDS while the consumer was still up. Messages did ingest fine.

How can I avoid data loss for #3 ??

From Logs:

2021-04-07 12:15:49,161 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: File State Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: -1)

2021-04-07 12:16:02,343 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 ms).
2021-04-07 12:16:11,951 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 8943d16e22b8aaf65d6b9e2b8bd54113.
2021-04-07 12:16:12,483 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 ms).

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis consumer Checkpointing data loss

Vijayendra Yadav
Thank You it helped.


On Apr 8, 2021, at 10:53 PM, Arvid Heise <[hidden email]> wrote:


Hi Vijay,

if you don't specify a checkpoint, then Flink assumes you want to start from scratch (e.g., you had a bug in your business logic and need to start completely without state).

If there is any failure and Flink restarts automatically, it will always pick up from the latest checkpoint [1].


On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav <[hidden email]> wrote:
Thanks it was working fine with: bin/flink run  -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \

On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Arvid,

Thanks for your response. I did not restart from the checkpoint. I assumed Flink would look for a checkpoint upon restart automatically.

I should restart like below ?

bin/flink run  -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \

Thanks,
Vijay

On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <[hidden email]> wrote:
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a checkpoint/savepoint? If you just start the application anew and use LATEST initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use unaligned checkpoints? (I don't really think this is because of unaligned checkpoint, but it's better to be sure and we want to reduce the possible error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

We are trying to make sure we are not losing data when KINESIS Consumer is down.

Kinesis streaming Job which has following checkpointing properties:

// checkpoint every X msecs
        env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
// enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
// allow job recovery fallback to checkpoint when there is a more recent savepoint
   env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
 // enables the experimental unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();
//checkpointpath
        env.setStateBackend(new FsStateBackend(Conf.getFlinkCheckPointPath(), true));

1) We killed the Kinesis Job
2) Sent messages to KDS while Consumer was down.
3) Restarted Flink Consumer, messages which were sent during the Consumer down period, never ingested (data loss).
4) Re-sent messages to KDS while the consumer was still up. Messages did ingest fine.

How can I avoid data loss for #3 ??

From Logs:

2021-04-07 12:15:49,161 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using application-defined state backend: File State Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: -1)

2021-04-07 12:16:02,343 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 ms).
2021-04-07 12:16:11,951 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 8943d16e22b8aaf65d6b9e2b8bd54113.
2021-04-07 12:16:12,483 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 ms).

Thanks,
Vijay