Problem with Flink restoring from checkpoints

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem with Flink restoring from checkpoints

Francisco Blaya
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.
Reply | Threaded
Open this post in threaded view
|

Re: Problem with Flink restoring from checkpoints

Fabian Hueske-2
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

Reply | Threaded
Open this post in threaded view
|

Re:Re: Problem with Flink restoring from checkpoints

gerryzhou
Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <[hidden email]> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



 

Reply | Threaded
Open this post in threaded view
|

Re:Re: Problem with Flink restoring from checkpoints

Tzu-Li (Gordon) Tai
Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee relies on offsets checkpoints as Flink state, and doesn’t rely on the committed offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In fact, in Flink 1.3, you can completely turn this off and still achieve exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the state snapshotting of the bucketing sink. This is suggesting that data is not flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m not entirely familiar with the bucketing sink, so this is just a superficial guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the offsets checkpointed as state of the Kafka consumer source, and 2) bucket state (which should keep track of uncommitted events w.r.t. Flink’s checkpoints; events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete). For details on this I recommend checking out the class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 ([hidden email]) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <[hidden email]> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Problem with Flink restoring from checkpoints

Francisco Blaya
Hi,

Thanks for your answers.

@Fabian. I can see that Flink's consumer commits the offsets to Kafka, no problem there. However I'd expect everything that gets read from Kafka to appear in S3 at some point, even if the job gets stopped/killed before flushing and then restarted. And that's what is not happening. So I see data loss in S3.

@Sihua. I assume that the fact that the DataTimeBucketer is configured as the sink of the stream means that its state gets snapshoted by Flink through the checkpoint mechanism.

@Gordon. When I say acking I mean indeed committing the offset back to Kafka. I agree with you, the problem seems to be related to the state snapshotting of the bucketing sink, nothing to do with Kafka. Could you please clarify what you mean with "events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete"? When you talk about uncommitted events of the bucket state you mean events that haven't been written to S3?

Cheers,
Fran
On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee relies on offsets checkpoints as Flink state, and doesn’t rely on the committed offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In fact, in Flink 1.3, you can completely turn this off and still achieve exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the state snapshotting of the bucketing sink. This is suggesting that data is not flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m not entirely familiar with the bucketing sink, so this is just a superficial guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the offsets checkpointed as state of the Kafka consumer source, and 2) bucket state (which should keep track of uncommitted events w.r.t. Flink’s checkpoints; events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete). For details on this I recommend checking out the class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 ([hidden email]) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <[hidden email]> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



 



hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.
Reply | Threaded
Open this post in threaded view
|

Re: Re: Problem with Flink restoring from checkpoints

Francisco Blaya
Forgot to add that when a job gets cancelled via the UI (this is not the case when the Yarn session is killed) a part file ending in ".pending" does appear in S3, but that never seems to be promoted to finished upon restart of the job

On 20 July 2017 at 11:41, Francisco Blaya <[hidden email]> wrote:
Hi,

Thanks for your answers.

@Fabian. I can see that Flink's consumer commits the offsets to Kafka, no problem there. However I'd expect everything that gets read from Kafka to appear in S3 at some point, even if the job gets stopped/killed before flushing and then restarted. And that's what is not happening. So I see data loss in S3.

@Sihua. I assume that the fact that the DataTimeBucketer is configured as the sink of the stream means that its state gets snapshoted by Flink through the checkpoint mechanism.

@Gordon. When I say acking I mean indeed committing the offset back to Kafka. I agree with you, the problem seems to be related to the state snapshotting of the bucketing sink, nothing to do with Kafka. Could you please clarify what you mean with "events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete"? When you talk about uncommitted events of the bucket state you mean events that haven't been written to S3?

Cheers,
Fran
On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee relies on offsets checkpoints as Flink state, and doesn’t rely on the committed offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In fact, in Flink 1.3, you can completely turn this off and still achieve exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the state snapshotting of the bucketing sink. This is suggesting that data is not flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m not entirely familiar with the bucketing sink, so this is just a superficial guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the offsets checkpointed as state of the Kafka consumer source, and 2) bucket state (which should keep track of uncommitted events w.r.t. Flink’s checkpoints; events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete). For details on this I recommend checking out the class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 ([hidden email]) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <[hidden email]> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



 




hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.
Reply | Threaded
Open this post in threaded view
|

Re: Problem with Flink restoring from checkpoints

Aljoscha Krettek
You said you cancel and restart the job. How do you then restart the Job? From a savepoint or externalised checkpoint? Do you also see missing data when using an externalised checkpoint or a savepoint?

Best,
Aljoscha

On 20. Jul 2017, at 16:15, Francisco Blaya <[hidden email]> wrote:

Forgot to add that when a job gets cancelled via the UI (this is not the case when the Yarn session is killed) a part file ending in ".pending" does appear in S3, but that never seems to be promoted to finished upon restart of the job

On 20 July 2017 at 11:41, Francisco Blaya <[hidden email]> wrote:
Hi,

Thanks for your answers.

@Fabian. I can see that Flink's consumer commits the offsets to Kafka, no problem there. However I'd expect everything that gets read from Kafka to appear in S3 at some point, even if the job gets stopped/killed before flushing and then restarted. And that's what is not happening. So I see data loss in S3.

@Sihua. I assume that the fact that the DataTimeBucketer is configured as the sink of the stream means that its state gets snapshoted by Flink through the checkpoint mechanism.

@Gordon. When I say acking I mean indeed committing the offset back to Kafka. I agree with you, the problem seems to be related to the state snapshotting of the bucketing sink, nothing to do with Kafka. Could you please clarify what you mean with "events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete"? When you talk about uncommitted events of the bucket state you mean events that haven't been written to S3?

Cheers,
Fran
On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee relies on offsets checkpoints as Flink state, and doesn’t rely on the committed offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In fact, in Flink 1.3, you can completely turn this off and still achieve exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the state snapshotting of the bucketing sink. This is suggesting that data is not flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m not entirely familiar with the bucketing sink, so this is just a superficial guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the offsets checkpointed as state of the Kafka consumer source, and 2) bucket state (which should keep track of uncommitted events w.r.t. Flink’s checkpoints; events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete). For details on this I recommend checking out the class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 ([hidden email]) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <[hidden email]> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



 



hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

Reply | Threaded
Open this post in threaded view
|

Re: Problem with Flink restoring from checkpoints

Francisco Blaya
Hi Aljoscha,

I've tried both. When we restart from a manually created savepoint we see "restored from savepoint"  in the Flink Dashboard. If we restart from externalized checkpoint we see "restored from checkpoint". In both scenarios we lose data in S3.

Cheers,
Fran

On 20 July 2017 at 17:54, Aljoscha Krettek <[hidden email]> wrote:
You said you cancel and restart the job. How do you then restart the Job? From a savepoint or externalised checkpoint? Do you also see missing data when using an externalised checkpoint or a savepoint?

Best,
Aljoscha

On 20. Jul 2017, at 16:15, Francisco Blaya <[hidden email]> wrote:

Forgot to add that when a job gets cancelled via the UI (this is not the case when the Yarn session is killed) a part file ending in ".pending" does appear in S3, but that never seems to be promoted to finished upon restart of the job

On 20 July 2017 at 11:41, Francisco Blaya <[hidden email]> wrote:
Hi,

Thanks for your answers.

@Fabian. I can see that Flink's consumer commits the offsets to Kafka, no problem there. However I'd expect everything that gets read from Kafka to appear in S3 at some point, even if the job gets stopped/killed before flushing and then restarted. And that's what is not happening. So I see data loss in S3.

@Sihua. I assume that the fact that the DataTimeBucketer is configured as the sink of the stream means that its state gets snapshoted by Flink through the checkpoint mechanism.

@Gordon. When I say acking I mean indeed committing the offset back to Kafka. I agree with you, the problem seems to be related to the state snapshotting of the bucketing sink, nothing to do with Kafka. Could you please clarify what you mean with "events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete"? When you talk about uncommitted events of the bucket state you mean events that haven't been written to S3?

Cheers,
Fran
On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee relies on offsets checkpoints as Flink state, and doesn’t rely on the committed offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In fact, in Flink 1.3, you can completely turn this off and still achieve exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the state snapshotting of the bucketing sink. This is suggesting that data is not flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m not entirely familiar with the bucketing sink, so this is just a superficial guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the offsets checkpointed as state of the Kafka consumer source, and 2) bucket state (which should keep track of uncommitted events w.r.t. Flink’s checkpoints; events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete). For details on this I recommend checking out the class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 ([hidden email]) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <[hidden email]> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



 



hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.
Reply | Threaded
Open this post in threaded view
|

Re: Problem with Flink restoring from checkpoints

Aljoscha Krettek
Hi,

Sorry for getting back to this so late but I think the underlying problem is that S3 does not behave as expected by the Bucketing Sink. See this message by Stephan on the topic: https://lists.apache.org/thread.html/34b8ede3affb965c7b5ec1e404918b39c282c258809dfd7a6c257a61@%3Cuser.flink.apache.org%3E

In there he also mentions a PR that we're trying to get in that should improve working with S3 as a sink.

Best,
Aljoscha

On 21. Jul 2017, at 16:04, Francisco Blaya <[hidden email]> wrote:

Hi Aljoscha,

I've tried both. When we restart from a manually created savepoint we see "restored from savepoint"  in the Flink Dashboard. If we restart from externalized checkpoint we see "restored from checkpoint". In both scenarios we lose data in S3.

Cheers,
Fran

On 20 July 2017 at 17:54, Aljoscha Krettek <[hidden email]> wrote:
You said you cancel and restart the job. How do you then restart the Job? From a savepoint or externalised checkpoint? Do you also see missing data when using an externalised checkpoint or a savepoint?

Best,
Aljoscha

On 20. Jul 2017, at 16:15, Francisco Blaya <[hidden email]> wrote:

Forgot to add that when a job gets cancelled via the UI (this is not the case when the Yarn session is killed) a part file ending in ".pending" does appear in S3, but that never seems to be promoted to finished upon restart of the job

On 20 July 2017 at 11:41, Francisco Blaya <[hidden email]> wrote:
Hi,

Thanks for your answers.

@Fabian. I can see that Flink's consumer commits the offsets to Kafka, no problem there. However I'd expect everything that gets read from Kafka to appear in S3 at some point, even if the job gets stopped/killed before flushing and then restarted. And that's what is not happening. So I see data loss in S3.

@Sihua. I assume that the fact that the DataTimeBucketer is configured as the sink of the stream means that its state gets snapshoted by Flink through the checkpoint mechanism.

@Gordon. When I say acking I mean indeed committing the offset back to Kafka. I agree with you, the problem seems to be related to the state snapshotting of the bucketing sink, nothing to do with Kafka. Could you please clarify what you mean with "events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete"? When you talk about uncommitted events of the bucket state you mean events that haven't been written to S3?

Cheers,
Fran
On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee relies on offsets checkpoints as Flink state, and doesn’t rely on the committed offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In fact, in Flink 1.3, you can completely turn this off and still achieve exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the state snapshotting of the bucketing sink. This is suggesting that data is not flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m not entirely familiar with the bucketing sink, so this is just a superficial guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the offsets checkpointed as state of the Kafka consumer source, and 2) bucket state (which should keep track of uncommitted events w.r.t. Flink’s checkpoints; events are considered as committed in bucketed sinks when the Flink checkpoint it is part of is complete). For details on this I recommend checking out the class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 ([hidden email]) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's state? If so, then i think the problem is not about Kafka, but about the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <[hidden email]> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes these in the checkpoints. In case of a recovery, it does not rely on the offsets which were committed back to Kafka but only on the offsets it checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <[hidden email]>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting the events to S3 through a DateTimeBucketer. We configured the bucketer to flush to S3 with an inactivity period of 5 mins.The rate at which events are written to Kafka in the first place is very low so it is easy for us to investigate how the Flink job would recover in respect to Kafka offsets after the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. The consequence of this seems to be that if the job gets cancelled before the acked events are flushed to S3 then these are lost, they don't get written when the job restarts. Flink doesn't seem to keep in its checkpointed state the fact that it acked those events but never flushed them to S3. Checkpoints are created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught "state.checkpoints.dir" configuration key and "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" in the job so that they don't automatically get cleaned up when the job gets cancelled or the Yarn session killed. We can see the job uses a restored checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



 



hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.



hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.