Hi, I had a couple questions about savepoints / checkpoints When I issue "Cancel Job with Savepoint", how is that instruction co-ordinated with check points? Am I certain the savepoint will be the last operation (i.e. no more check points)?
I have a kafka src>operation>kafka sink task in flink. And it looks like on restart from the savepoint there are duplicates written to the sink topic in kafka. The dupes overlap with the last few events prior to save point, and I am trying
to work out what could have happened. My FlinkKafkaProducer011 is set to Semantic.AT_LEAST_ONCE, but env.enableCheckpointing(parameters.getInt("checkpoint.interval"), CheckpointingMode.EXACTLY_ONCE).
I thought at least once still implies flushes to kafka still only occur with a checkpoint.
One theory is a further checkpoint occurred after/ during the savepoint - which would have flushed events to kafka that are not in my savepoint.
Any pointers to schoolboy errors I may have made would be appreciated.
----- Also am I right in thinking if I have managed state with rocksdb back end that is using 1G on disk, but substantially less keyed state in memory, a savepoint needs to save the full 1G to complete? Regards Anand Visit our website at http://www.ubs.com This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mails are not encrypted and cannot be guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender therefore does not accept liability for any errors or omissions in the contents of this message which arise as a result of e-mail transmission. If verification is required please request a hard-copy version. This message is provided for informational purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments. UBS Limited is a company limited by shares incorporated in the United Kingdom registered in England and Wales with number 2035362. Registered Office: 5 Broadgate, London EC2M 2QS UBS Limited is authorised by the Prudential Regulation Authority and regulated by the Financial Conduct Authority and the Prudential Regulation Authority. UBS AG is a public company incorporated with limited liability in Switzerland domiciled in the Canton of Basel-City and the Canton of Zurich respectively registered at the Commercial Registry offices in those Cantons with new Identification No: CHE-101.329.561 as from 18 December 2013 (and prior to 18 December 2013 with Identification No: CH-270.3.004.646-4) and having respective head offices at Aeschenvorstadt 1, 4051 Basel and Bahnhofstrasse 45, 8001 Zurich, Switzerland and is authorised and regulated by the Financial Market Supervisory Authority in Switzerland. Registered in the United Kingdom as a foreign company with No: FC021146 and having a UK Establishment registered at Companies House, Cardiff, with No: BR 004507. The principal office of UK Establishment: 5 Broadgate, London EC2M 2QS. In the United Kingdom, UBS AG is authorised by the Prudential Regulation Authority and subject to regulation by the Financial Conduct Authority and limited regulation by the Prudential Regulation Authority. Details about the extent of our regulation by the Prudential Regulation Authority are available from us on request. UBS Business Solutions AG is a public company incorporated with limited liability in Switzerland domiciled in the Canton of Zurich registered at the Commercial Registry office with Identification No: CHE-262.289.477 and having its head office at Bahnhofstrasse 45, 8001 Zurich, Switzerland. Registered in the United Kingdom as a foreign company with No: FC034139 and having a UK Establishment registered at Companies House, Cardiff, with No: BR019277. The principal office of UK Establishment: 5 Broadgate London EC2M 2QS. UBS reserves the right to retain all messages. Messages are protected and accessed only in legally justified cases. |
AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler --> trigger a savepoint, then cancel your job. there will no more checkpoints. <[hidden email]> 于2018年10月12日周五 上午1:30写道:
|
Hi Anand, About "Cancel with savepoint" congxian is right. And for the duplicates, You should use kafka producer transaction (since 0.11) provided EXACTLY_ONCE semantic[1]. Thanks, vino. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html#kafka-011 Congxian Qiu <[hidden email]> 于2018年10月12日周五 下午7:55写道:
|
Hi Anand,
Did the suggestion solve your issue? Essentially when you cancel with savepoint, as Congxian suggested, you stop emitting checkpoints, but data keep flowing from the source to the sink. So if you do not set the producer to exactly once, you will almost certainly end up with duplicates. Cheers, Kostas
|
Free forum by Nabble | Edit this page |