Are savepoints / checkpoints co-ordinated?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Are savepoints / checkpoints co-ordinated?

anand.gopinath

Hi,

 

I had a couple questions about savepoints / checkpoints

 

When I issue "Cancel Job with Savepoint", how is that instruction co-ordinated with check points? Am I certain the savepoint will be the last operation (i.e. no more check points)? 

 

I have a kafka src>operation>kafka sink task in flink. And it looks like on restart from the savepoint there are duplicates written to the sink topic in kafka. The dupes overlap with the last few events prior to save point, and I am trying to work out what could have happened.

My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but env.enableCheckpointing(parameters.getInt("checkpoint.interval"), CheckpointingMode.EXACTLY_ONCE).

I thought at least once still implies flushes to kafka still only occur with a checkpoint.

 

One  theory is a further checkpoint occurred after/ during the savepoint - which would have flushed events to kafka that are not in my savepoint.

 

Any pointers to schoolboy errors I may have made would be appreciated.

 

-----

Also  am I right in thinking if I have managed state with rocksdb back end that is using 1G on disk, but substantially less keyed state in memory, a savepoint needs to save the full 1G to complete?

 

Regards

Anand



Visit our website at http://www.ubs.com 

This message contains confidential information and is intended only
for the individual named. If you are not the named addressee you
should not disseminate, distribute or copy this e-mail. Please
notify the sender immediately by e-mail if you have received this
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or
error-free as information could be intercepted, corrupted, lost,
destroyed, arrive late or incomplete, or contain viruses. The sender
therefore does not accept liability for any errors or omissions in the
contents of this message which arise as a result of e-mail transmission.
If verification is required please request a hard-copy version. This
message is provided for informational purposes and should not be
construed as a solicitation or offer to buy or sell any securities
or related financial instruments.

UBS Limited is a company limited by shares incorporated in the United
Kingdom registered in England and Wales with number 2035362.  
Registered Office: 5 Broadgate, London EC2M 2QS
UBS Limited is authorised by the Prudential Regulation Authority
and regulated by the Financial Conduct Authority and the Prudential
Regulation Authority.

UBS AG is a public company incorporated with limited liability in
Switzerland domiciled in the Canton of Basel-City and the Canton of
Zurich respectively registered at the Commercial Registry offices in
those Cantons with new Identification No: CHE-101.329.561 as from 18
December 2013 (and prior to 18 December 2013 with Identification
No: CH-270.3.004.646-4) and having respective head offices at
Aeschenvorstadt 1, 4051 Basel and Bahnhofstrasse 45, 8001 Zurich,
Switzerland and is authorised and regulated by the Financial Market
Supervisory Authority in Switzerland.  Registered in the United
Kingdom as a foreign company with No: FC021146 and having a UK
Establishment registered at Companies House, Cardiff, with
No: BR 004507.  The principal office of UK Establishment:
5 Broadgate, London EC2M 2QS. In the United Kingdom, UBS AG is
authorised by the Prudential Regulation Authority and subject to
regulation by the Financial Conduct Authority and limited regulation
by the Prudential Regulation Authority.  Details about the extent
of our regulation by the Prudential Regulation Authority are
available from us on request.

UBS Business Solutions AG is a public company incorporated with
limited liability in Switzerland domiciled in the Canton of Zurich
registered at the Commercial Registry office with Identification
No: CHE-262.289.477 and having its head office at Bahnhofstrasse 45,
8001 Zurich, Switzerland.  Registered in the United Kingdom as a
foreign company with No: FC034139 and having a UK Establishment
registered at Companies House, Cardiff, with No: BR019277.  The
principal office of UK Establishment: 5 Broadgate London EC2M 2QS.  

UBS reserves the right to retain all messages. Messages are protected
and accessed only in legally justified cases.
Reply | Threaded
Open this post in threaded view
|

Re: Are savepoints / checkpoints co-ordinated?

Congxian Qiu
AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->  trigger a savepoint, then cancel your job. there will no more checkpoints.

<[hidden email]> 于2018年10月12日周五 上午1:30写道:

Hi,

 

I had a couple questions about savepoints / checkpoints

 

When I issue "Cancel Job with Savepoint", how is that instruction co-ordinated with check points? Am I certain the savepoint will be the last operation (i.e. no more check points)? 

 

I have a kafka src>operation>kafka sink task in flink. And it looks like on restart from the savepoint there are duplicates written to the sink topic in kafka. The dupes overlap with the last few events prior to save point, and I am trying to work out what could have happened.

My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but env.enableCheckpointing(parameters.getInt("checkpoint.interval"), CheckpointingMode.EXACTLY_ONCE).

I thought at least once still implies flushes to kafka still only occur with a checkpoint.

 

One  theory is a further checkpoint occurred after/ during the savepoint - which would have flushed events to kafka that are not in my savepoint.

 

Any pointers to schoolboy errors I may have made would be appreciated.

 

-----

Also  am I right in thinking if I have managed state with rocksdb back end that is using 1G on disk, but substantially less keyed state in memory, a savepoint needs to save the full 1G to complete?

 

Regards

Anand



--
GTalk:qcx978132955
一切随心
Reply | Threaded
Open this post in threaded view
|

Re: Are savepoints / checkpoints co-ordinated?

vino yang
Hi Anand,

About "Cancel with savepoint" congxian is right.

And for the duplicates, You should use kafka producer transaction (since 0.11) provided EXACTLY_ONCE semantic[1].

Thanks, vino.



Congxian Qiu <[hidden email]> 于2018年10月12日周五 下午7:55写道:
AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->  trigger a savepoint, then cancel your job. there will no more checkpoints.

<[hidden email]> 于2018年10月12日周五 上午1:30写道:

Hi,

 

I had a couple questions about savepoints / checkpoints

 

When I issue "Cancel Job with Savepoint", how is that instruction co-ordinated with check points? Am I certain the savepoint will be the last operation (i.e. no more check points)? 

 

I have a kafka src>operation>kafka sink task in flink. And it looks like on restart from the savepoint there are duplicates written to the sink topic in kafka. The dupes overlap with the last few events prior to save point, and I am trying to work out what could have happened.

My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but env.enableCheckpointing(parameters.getInt("checkpoint.interval"), CheckpointingMode.EXACTLY_ONCE).

I thought at least once still implies flushes to kafka still only occur with a checkpoint.

 

One  theory is a further checkpoint occurred after/ during the savepoint - which would have flushed events to kafka that are not in my savepoint.

 

Any pointers to schoolboy errors I may have made would be appreciated.

 

-----

Also  am I right in thinking if I have managed state with rocksdb back end that is using 1G on disk, but substantially less keyed state in memory, a savepoint needs to save the full 1G to complete?

 

Regards

Anand



--
GTalk:qcx978132955
一切随心
Reply | Threaded
Open this post in threaded view
|

Re: Are savepoints / checkpoints co-ordinated?

Kostas Kloudas
Hi Anand,

Did the suggestion solve your issue?

Essentially when you cancel with savepoint, as Congxian suggested, you stop emitting checkpoints, 
but data keep flowing from the source to the sink. So if you do not set  the producer to exactly
once, you will almost certainly end up with duplicates.

Cheers,
Kostas

On Oct 13, 2018, at 2:23 PM, vino yang <[hidden email]> wrote:

Hi Anand,

About "Cancel with savepoint" congxian is right.

And for the duplicates, You should use kafka producer transaction (since 0.11) provided EXACTLY_ONCE semantic[1].

Thanks, vino.



Congxian Qiu <[hidden email]> 于2018年10月12日周五 下午7:55写道:
AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->  trigger a savepoint, then cancel your job. there will no more checkpoints.

<[hidden email]> 于2018年10月12日周五 上午1:30写道:

Hi,

 

I had a couple questions about savepoints / checkpoints

 

When I issue "Cancel Job with Savepoint", how is that instruction co-ordinated with check points? Am I certain the savepoint will be the last operation (i.e. no more check points)? 

 

I have a kafka src>operation>kafka sink task in flink. And it looks like on restart from the savepoint there are duplicates written to the sink topic in kafka. The dupes overlap with the last few events prior to save point, and I am trying to work out what could have happened.

My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but env.enableCheckpointing(parameters.getInt("checkpoint.interval"), CheckpointingMode.EXACTLY_ONCE).

I thought at least once still implies flushes to kafka still only occur with a checkpoint.

 

One  theory is a further checkpoint occurred after/ during the savepoint - which would have flushed events to kafka that are not in my savepoint.

 

Any pointers to schoolboy errors I may have made would be appreciated.

 

-----

Also  am I right in thinking if I have managed state with rocksdb back end that is using 1G on disk, but substantially less keyed state in memory, a savepoint needs to save the full 1G to complete?

 

Regards

Anand



--
GTalk:qcx978132955
一切随心