Flink kafka

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink kafka

周瑞

程序用于测试 flink kafka exactly once, 普通提交可以正常运行, 但如果从 savepoint 中恢复就会报下面的错误
kafka server 端, 配置了  transaction.timeout.ms = 900000

public static final String KAFKA_TABLE_FORMAT =
"CREATE TABLE "+TABLE_NAME+" (\n" +
" "+COLUMN_NAME+" STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'sink.semantic' = 'exactly-once',\n" +
" 'properties.transaction.timeout.ms' = '900000',\n" +
" 'sink.partitioner' = 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
" 'properties.max.in.flight.requests.per.connection' = '1',\n" +
" 'properties.enable.idempotence' = 'true',\n" +
" 'properties.transactional.id' = '%s',\n" +
" 'format' = 'dbz-json'\n" +
")\n";

2021-05-30 19:38:57,513 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, debezium_source]], fields=[data]) -> Sink: Sink(table=[default_catalog.default_database.KafkaTable2], fields=[data]) (1/1)#144518 (4739d37a5f82268901f8fb51b39735e9) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
    at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
    at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at java.lang.Thread.run(Thread.java:748)

查阅了 google 上的相关资料, 但是仍然无法解决, 有人遇到过类似的问题? 或者能提供排查思路么?
Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka

Till Rohrmann
Responded as part of the following discussion https://lists.apache.org/x/thread.html/re85a1fb3f17b5ef1af2844fc1a07b6d2f5e6237bf4c33059e13890ee@%3Cuser.flink.apache.org%3E. Let's continue the discussion there.

Cheers,
Till

On Sun, May 30, 2021 at 2:32 PM 周瑞 <[hidden email]> wrote:

程序用于测试 flink kafka exactly once, 普通提交可以正常运行, 但如果从 savepoint 中恢复就会报下面的错误
kafka server 端, 配置了  transaction.timeout.ms = 900000

public static final String KAFKA_TABLE_FORMAT =
"CREATE TABLE "+TABLE_NAME+" (\n" +
" "+COLUMN_NAME+" STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'sink.semantic' = 'exactly-once',\n" +
" 'properties.transaction.timeout.ms' = '900000',\n" +
" 'sink.partitioner' = 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
" 'properties.max.in.flight.requests.per.connection' = '1',\n" +
" 'properties.enable.idempotence' = 'true',\n" +
" 'properties.transactional.id' = '%s',\n" +
" 'format' = 'dbz-json'\n" +
")\n";

2021-05-30 19:38:57,513 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, debezium_source]], fields=[data]) -> Sink: Sink(table=[default_catalog.default_database.KafkaTable2], fields=[data]) (1/1)#144518 (4739d37a5f82268901f8fb51b39735e9) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
    at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
    at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at java.lang.Thread.run(Thread.java:748)

查阅了 google 上的相关资料, 但是仍然无法解决, 有人遇到过类似的问题? 或者能提供排查思路么?