Re: recover from svaepoint

Posted by Piotr Nowojski-4 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/recover-from-svaepoint-tp44081p44137.html

Hi,

I think there is no generic way. If this error has happened indeed after starting a second job from the same savepoint, or something like that, user can change Sink's operator UID.

If this is an issue of intentional recovery from an earlier checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be helpful.

Best, Piotrek

wt., 1 cze 2021 o 15:16 Till Rohrmann <[hidden email]> napisał(a):
The error message says that we are trying to reuse a transaction id that is
currently being used or has expired.

I am not 100% sure how this can happen. My suspicion is that you have
resumed a job multiple times from the same savepoint. Have you checked that
there is no other job which has been resumed from the same savepoint and
which is currently running or has run and completed checkpoints?

@pnowojski <[hidden email]> @Becket Qin <[hidden email]> how
does the transaction id generation ensures that we don't have a clash of
transaction ids if we resume the same job multiple times from the same
savepoint? From the code, I do see that we have a TransactionalIdsGenerator
which is initialized with the taskName and the operator UID.

fyi: @Arvid Heise <[hidden email]>

Cheers,
Till


On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:

> HI:
>       When "sink.semantic = exactly-once", the following exception is
> thrown when recovering from svaepoint
>
>        public static final String KAFKA_TABLE_FORMAT =
>             "CREATE TABLE "+TABLE_NAME+" (\n" +
>                     "  "+COLUMN_NAME+" STRING\n" +
>                     ") WITH (\n" +
>                     "   'connector' = 'kafka',\n" +
>                     "   'topic' = '%s',\n" +
>                     "   'properties.bootstrap.servers' = '%s',\n" +
>                     "   'sink.semantic' = 'exactly-once',\n" +
>                     "   'properties.transaction.timeout.ms' =
> '900000',\n" +
>                     "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>                     "   'format' = 'dbz-json'\n" +
>                     ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
>     at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
>     at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>     at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
>     at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
>     at java.lang.Thread.run(Thread.java:748)
>