recover from svaepoint

Posted by 周瑞 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/recover-from-svaepoint-tp44081.html

HI:
      When "sink.semantic = exactly-once", the following exception is thrown when recovering from svaepoint

       public static final String KAFKA_TABLE_FORMAT =
            "CREATE TABLE "+TABLE_NAME+" (\n" +
                    "  "+COLUMN_NAME+" STRING\n" +
                    ") WITH (\n" +
                    "   'connector' = 'kafka',\n" +
                    "   'topic' = '%s',\n" +
                    "   'properties.bootstrap.servers' = '%s',\n" +
                    "   'sink.semantic' = 'exactly-once',\n" +
                    "   'properties.transaction.timeout.ms' = '900000',\n" +
                    "   'sink.partitioner' = 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
                    "   'format' = 'dbz-json'\n" +
                    ")\n";
  [] - SourceTableSourceScan(table=[[default_catalog, default_database, debezium_source]], fields=[data]) -> SinkSink(table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1)#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.KafkaExceptionUnexpected error in InitProducerIdResponseProducer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
    at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
    at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at java.lang.Thread.run(Thread.java:748)