Kafka producer failed with InvalidTxnStateException when performing commit transaction

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Fabian Hueske-2
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Becket Qin
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi Becket,

I have reproduced this problem in our development environment. Below is the log message with debug level.
Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only
shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks.

Number of Kafka brokers: 3
logging config for kafka:
log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.transactionAppender.MaxFileSize=10MB
log4j.appender.transactionAppender.MaxBackupIndex=10
log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
log4j.additivity.kafka.coordinator.transaction=true

flink-ui
Timestamp: 2019-09-18, 07:13:43
 
java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

broker-3
[2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)

broker-2
[2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, produc
erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
0de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning NOT_COORDINATOR error code to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

Best,
Tony Wei


Becket Qin <[hidden email]> 於 2019年9月2日 週一 下午10:03寫道:
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi Becket,

One more thing, I have tried to restart other brokers without active controller, but
this exception might happen as well. So it should be independent  of the active
controller like you said.

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月18日 週三 下午6:14寫道:
Hi Becket,

I have reproduced this problem in our development environment. Below is the log message with debug level.
Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only
shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks.

Number of Kafka brokers: 3
logging config for kafka:
log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.transactionAppender.MaxFileSize=10MB
log4j.appender.transactionAppender.MaxBackupIndex=10
log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
log4j.additivity.kafka.coordinator.transaction=true

flink-ui
Timestamp: 2019-09-18, 07:13:43
 
java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

broker-3
[2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)

broker-2
[2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, produc
erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
0de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning NOT_COORDINATOR error code to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

Best,
Tony Wei


Becket Qin <[hidden email]> 於 2019年9月2日 週一 下午10:03寫道:
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi Becket,

I found that those transactions were tend to be failed with InvalidTxnStateException if
they never sent any records but committed after some brokers being restarted.

Because the error state transition always failed from EMPTY to COMMIT, I run a
job with only one parallelism with or without output to Kafka. I tried to restart brokers
and see what happened on these two situations and found that I couldn't make job failed
when job continuously emitted output to Kafka, but it could fail when it didn't send any
output to Kafka.

I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka java producer
to reproduce the exception, but it worked well. Maybe my observation is not correct,
but the experiment result seems like that. Do you have any thoughts on this?

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月19日 週四 上午11:08寫道:
Hi Becket,

One more thing, I have tried to restart other brokers without active controller, but
this exception might happen as well. So it should be independent  of the active
controller like you said.

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月18日 週三 下午6:14寫道:
Hi Becket,

I have reproduced this problem in our development environment. Below is the log message with debug level.
Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only
shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks.

Number of Kafka brokers: 3
logging config for kafka:
log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.transactionAppender.MaxFileSize=10MB
log4j.appender.transactionAppender.MaxBackupIndex=10
log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
log4j.additivity.kafka.coordinator.transaction=true

flink-ui
Timestamp: 2019-09-18, 07:13:43
 
java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

broker-3
[2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)

broker-2
[2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, produc
erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
0de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning NOT_COORDINATOR error code to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

Best,
Tony Wei


Becket Qin <[hidden email]> 於 2019年9月2日 週一 下午10:03寫道:
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi,

Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened 
flink's log level to DEBUG for producer. And I found some logs from flink side
regarding this error. Below is some log snippet.

It seems that producer client didn't catch this error and retry to find new coordinator.
This caused the transaction state is inconsistent between client side and server side.
Would it be possible that the problem is caused by FlinkKafkaInternalProducer using
java reflection to send `addPartitionsToTransactionHandler` request in
`FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who is familiar
with both kafka and flink's kafka connector could help me solve this? Thanks very much.

The attachment is my code to reproduce this problem.
The cluster's versions are the same as I mentioned in my first email.

Best,
Tony Wei

flink taskmanager:
2019-09-20 02:32:45,927 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer  - Flushing new partitions
2019-09-20 02:32:45,927 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, partitions=[])
2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.producer.internals.Sender            - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, partitions=[]) to node kafka-broker-1:9092 (id: 1 rack: null)
2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient                        - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]} with correlation id 12 to node 1
2019-09-20 02:32:45,937 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions [] to transaction

kafka-broker-1:
 [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer epoch 1 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning NOT_COORDINATOR error code to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)



Tony Wei <[hidden email]> 於 2019年9月19日 週四 下午6:25寫道:
Hi Becket,

I found that those transactions were tend to be failed with InvalidTxnStateException if
they never sent any records but committed after some brokers being restarted.

Because the error state transition always failed from EMPTY to COMMIT, I run a
job with only one parallelism with or without output to Kafka. I tried to restart brokers
and see what happened on these two situations and found that I couldn't make job failed
when job continuously emitted output to Kafka, but it could fail when it didn't send any
output to Kafka.

I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka java producer
to reproduce the exception, but it worked well. Maybe my observation is not correct,
but the experiment result seems like that. Do you have any thoughts on this?

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月19日 週四 上午11:08寫道:
Hi Becket,

One more thing, I have tried to restart other brokers without active controller, but
this exception might happen as well. So it should be independent  of the active
controller like you said.

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月18日 週三 下午6:14寫道:
Hi Becket,

I have reproduced this problem in our development environment. Below is the log message with debug level.
Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only
shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks.

Number of Kafka brokers: 3
logging config for kafka:
log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.transactionAppender.MaxFileSize=10MB
log4j.appender.transactionAppender.MaxBackupIndex=10
log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
log4j.additivity.kafka.coordinator.transaction=true

flink-ui
Timestamp: 2019-09-18, 07:13:43
 
java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

broker-3
[2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)

broker-2
[2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, produc
erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
0de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning NOT_COORDINATOR error code to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

Best,
Tony Wei


Becket Qin <[hidden email]> 於 2019年9月2日 週一 下午10:03寫道:
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei

Main2.scala (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi,

I found that the source code [1] in kafka showed that it always check if `newPartitionsInTransaction`
is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`, that is not
applied to flink kafka producer code [2].

I wrote a simple producer with the `flushNewPartitions` copied from flink kafka producer, and
successfully reproduce this exception. Then, I modified the logic in `enqueueNewPartitions` to check
if there is any `newPartitionsInTransaction` before make this request. And this would work well even
if I restarted the broker who owned this transaction's coordinator, since the empty transaction won't
make any request to server.

The attachments are my simple producer code. Please help to verify what I thought is correct. Thanks.

Best,
Tony Wei


Tony Wei <[hidden email]> 於 2019年9月20日 週五 上午11:56寫道:
Hi,

Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened 
flink's log level to DEBUG for producer. And I found some logs from flink side
regarding this error. Below is some log snippet.

It seems that producer client didn't catch this error and retry to find new coordinator.
This caused the transaction state is inconsistent between client side and server side.
Would it be possible that the problem is caused by FlinkKafkaInternalProducer using
java reflection to send `addPartitionsToTransactionHandler` request in
`FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who is familiar
with both kafka and flink's kafka connector could help me solve this? Thanks very much.

The attachment is my code to reproduce this problem.
The cluster's versions are the same as I mentioned in my first email.

Best,
Tony Wei

flink taskmanager:
2019-09-20 02:32:45,927 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer  - Flushing new partitions
2019-09-20 02:32:45,927 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, partitions=[])
2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.producer.internals.Sender            - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, partitions=[]) to node kafka-broker-1:9092 (id: 1 rack: null)
2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient                        - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]} with correlation id 12 to node 1
2019-09-20 02:32:45,937 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions [] to transaction

kafka-broker-1:
 [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer epoch 1 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning NOT_COORDINATOR error code to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)



Tony Wei <[hidden email]> 於 2019年9月19日 週四 下午6:25寫道:
Hi Becket,

I found that those transactions were tend to be failed with InvalidTxnStateException if
they never sent any records but committed after some brokers being restarted.

Because the error state transition always failed from EMPTY to COMMIT, I run a
job with only one parallelism with or without output to Kafka. I tried to restart brokers
and see what happened on these two situations and found that I couldn't make job failed
when job continuously emitted output to Kafka, but it could fail when it didn't send any
output to Kafka.

I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka java producer
to reproduce the exception, but it worked well. Maybe my observation is not correct,
but the experiment result seems like that. Do you have any thoughts on this?

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月19日 週四 上午11:08寫道:
Hi Becket,

One more thing, I have tried to restart other brokers without active controller, but
this exception might happen as well. So it should be independent  of the active
controller like you said.

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月18日 週三 下午6:14寫道:
Hi Becket,

I have reproduced this problem in our development environment. Below is the log message with debug level.
Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only
shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks.

Number of Kafka brokers: 3
logging config for kafka:
log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.transactionAppender.MaxFileSize=10MB
log4j.appender.transactionAppender.MaxBackupIndex=10
log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
log4j.additivity.kafka.coordinator.transaction=true

flink-ui
Timestamp: 2019-09-18, 07:13:43
 
java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

broker-3
[2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)

broker-2
[2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, produc
erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
0de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning NOT_COORDINATOR error code to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

Best,
Tony Wei


Becket Qin <[hidden email]> 於 2019年9月2日 週一 下午10:03寫道:
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei

Util.java (4K) Download Attachment
Main.scala (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi Becket,

I have read kafka source code and found that the error won't be propagated to client if the list of
topic-partition is empty [1], because it bind the error with each topic-partition. If this list is empty,
then that error won't be packaged into response body. That made the client didn't get the error
message to find the newer coordinator.

Back to this problem, I think the original design of kafka client might not prefer to execute
`enqueueNewPartitions` if there is no added topic-partition. It might be a bug here, and we should
first check if `newPartitionsInTransaction` list is empty before executing `enqueueNewPartitions`
function. Am I right?

If it can be confirmed as a bug, I would like to submit my patch to fix it. Thanks for your help.

Best,
Tony Wei


Tony Wei <[hidden email]> 於 2019年9月20日 週五 下午2:57寫道:
Hi,

I found that the source code [1] in kafka showed that it always check if `newPartitionsInTransaction`
is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`, that is not
applied to flink kafka producer code [2].

I wrote a simple producer with the `flushNewPartitions` copied from flink kafka producer, and
successfully reproduce this exception. Then, I modified the logic in `enqueueNewPartitions` to check
if there is any `newPartitionsInTransaction` before make this request. And this would work well even
if I restarted the broker who owned this transaction's coordinator, since the empty transaction won't
make any request to server.

The attachments are my simple producer code. Please help to verify what I thought is correct. Thanks.

Best,
Tony Wei


Tony Wei <[hidden email]> 於 2019年9月20日 週五 上午11:56寫道:
Hi,

Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened 
flink's log level to DEBUG for producer. And I found some logs from flink side
regarding this error. Below is some log snippet.

It seems that producer client didn't catch this error and retry to find new coordinator.
This caused the transaction state is inconsistent between client side and server side.
Would it be possible that the problem is caused by FlinkKafkaInternalProducer using
java reflection to send `addPartitionsToTransactionHandler` request in
`FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who is familiar
with both kafka and flink's kafka connector could help me solve this? Thanks very much.

The attachment is my code to reproduce this problem.
The cluster's versions are the same as I mentioned in my first email.

Best,
Tony Wei

flink taskmanager:
2019-09-20 02:32:45,927 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer  - Flushing new partitions
2019-09-20 02:32:45,927 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, partitions=[])
2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.producer.internals.Sender            - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, partitions=[]) to node kafka-broker-1:9092 (id: 1 rack: null)
2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient                        - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]} with correlation id 12 to node 1
2019-09-20 02:32:45,937 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions [] to transaction

kafka-broker-1:
 [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer epoch 1 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning NOT_COORDINATOR error code to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)



Tony Wei <[hidden email]> 於 2019年9月19日 週四 下午6:25寫道:
Hi Becket,

I found that those transactions were tend to be failed with InvalidTxnStateException if
they never sent any records but committed after some brokers being restarted.

Because the error state transition always failed from EMPTY to COMMIT, I run a
job with only one parallelism with or without output to Kafka. I tried to restart brokers
and see what happened on these two situations and found that I couldn't make job failed
when job continuously emitted output to Kafka, but it could fail when it didn't send any
output to Kafka.

I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka java producer
to reproduce the exception, but it worked well. Maybe my observation is not correct,
but the experiment result seems like that. Do you have any thoughts on this?

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月19日 週四 上午11:08寫道:
Hi Becket,

One more thing, I have tried to restart other brokers without active controller, but
this exception might happen as well. So it should be independent  of the active
controller like you said.

Best,
Tony Wei

Tony Wei <[hidden email]> 於 2019年9月18日 週三 下午6:14寫道:
Hi Becket,

I have reproduced this problem in our development environment. Below is the log message with debug level.
Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only
shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks.

Number of Kafka brokers: 3
logging config for kafka:
log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.transactionAppender.MaxFileSize=10MB
log4j.appender.transactionAppender.MaxBackupIndex=10
log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
log4j.additivity.kafka.coordinator.transaction=true

flink-ui
Timestamp: 2019-09-18, 07:13:43
 
java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

broker-3
[2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting append of COMMIT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)

broker-2
[2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, produc
erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e6
0de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] Returning NOT_COORDINATOR error code to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

Best,
Tony Wei


Becket Qin <[hidden email]> 於 2019年9月2日 週一 下午10:03寫道:
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei