Hi,
Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours. When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint after I restarted the broker with active controller. java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have this exception before? How can I prevent from this exception when I tried to restart kafka cluster? Does this exception mean that I will lost data in some of these transactions? flink cluster version: 1.8.1 kafka cluster version: 1.0.1 flink kafka producer version: universal producer transaction timeout: 15 minutes checkpoint interval: 5 minutes number of concurrent checkpoint: 1 max checkpoint duration before and after the exception occurred: < 2 minutes Best, Tony Wei |
Hi Tony, I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here. Best, Fabian Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
|
Hi, Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours, but the problem sill happened when I restarted broker with active controller. It might not due to the problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out what's wrong about my kafka producer. Could someone help me please? Best, Tony Wei Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
|
Hi Tony, From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say? Thanks, Jiangjie (Becket) Qin On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
|
Hi Becket, I have reproduced this problem in our development environment. Below is the log message with debug level. Seems that the exception was from broker-3, and I also found other error code in broker-2 during the time. There are others INVALID_TXN_STATE error for other transaction id. I just list one of them. Above log messages only shows message with `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before `2019-09-18 07:14`. I didn't see other information to find out why producer tried to make transaction state from EMPTY to COMMIT, and what made NOT_COORDINATOR happened. Do you have any thought about what's happening? Thanks. Number of Kafka brokers: 3 logging config for kafka: log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender flink-ui Timestamp: 2019-09-18, 07:13:43 java.lang.RuntimeException: Error while confirming checkpoint [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] TransactionalId: blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but received transaction marker result to send: COMMIT (kafka.coordinator.transaction.TransactionCoordinator) broker-2 [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to TxnTransitMetadata(producerId=7019, produc Best, Tony Wei Becket Qin <[hidden email]> 於 2019年9月2日 週一 下午10:03寫道:
|
Hi Becket, One more thing, I have tried to restart other brokers without active controller, but this exception might happen as well. So it should be independent of the active controller like you said. Best, Tony Wei Tony Wei <[hidden email]> 於 2019年9月18日 週三 下午6:14寫道:
|
Hi Becket, I found that those transactions were tend to be failed with InvalidTxnStateException if they never sent any records but committed after some brokers being restarted. Because the error state transition always failed from EMPTY to COMMIT, I run a job with only one parallelism with or without output to Kafka. I tried to restart brokers and see what happened on these two situations and found that I couldn't make job failed when job continuously emitted output to Kafka, but it could fail when it didn't send any output to Kafka. I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka java producer to reproduce the exception, but it worked well. Maybe my observation is not correct, but the experiment result seems like that. Do you have any thoughts on this? Best, Tony Wei Tony Wei <[hidden email]> 於 2019年9月19日 週四 上午11:08寫道:
|
Hi, Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened flink's log level to DEBUG for producer. And I found some logs from flink side regarding this error. Below is some log snippet. It seems that producer client didn't catch this error and retry to find new coordinator. This caused the transaction state is inconsistent between client side and server side. Would it be possible that the problem is caused by FlinkKafkaInternalProducer using java reflection to send `addPartitionsToTransactionHandler` request in `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who is familiar with both kafka and flink's kafka connector could help me solve this? Thanks very much. The attachment is my code to reproduce this problem. The cluster's versions are the same as I mentioned in my first email. Best, Tony Wei flink taskmanager: 2019-09-20 02:32:45,927 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer - Flushing new partitions 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-29, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, partitions=[]) to node kafka-broker-1:9092 (id: 1 rack: null) kafka-broker-1: [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer epoch 1 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator) [2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning NOT_COORDINATOR error code to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator) Tony Wei <[hidden email]> 於 2019年9月19日 週四 下午6:25寫道:
Main2.scala (5K) Download Attachment |
Hi, I found that the source code [1] in kafka showed that it always check if `newPartitionsInTransaction` is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`, that is not applied to flink kafka producer code [2]. I wrote a simple producer with the `flushNewPartitions` copied from flink kafka producer, and successfully reproduce this exception. Then, I modified the logic in `enqueueNewPartitions` to check if there is any `newPartitionsInTransaction` before make this request. And this would work well even if I restarted the broker who owned this transaction's coordinator, since the empty transaction won't make any request to server. The attachments are my simple producer code. Please help to verify what I thought is correct. Thanks. Best, Tony Wei Tony Wei <[hidden email]> 於 2019年9月20日 週五 上午11:56寫道:
|
Hi Becket, I have read kafka source code and found that the error won't be propagated to client if the list of topic-partition is empty [1], because it bind the error with each topic-partition. If this list is empty, then that error won't be packaged into response body. That made the client didn't get the error message to find the newer coordinator. Back to this problem, I think the original design of kafka client might not prefer to execute `enqueueNewPartitions` if there is no added topic-partition. It might be a bug here, and we should first check if `newPartitionsInTransaction` list is empty before executing `enqueueNewPartitions` function. Am I right? If it can be confirmed as a bug, I would like to submit my patch to fix it. Thanks for your help. Best, Tony Wei Tony Wei <[hidden email]> 於 2019年9月20日 週五 下午2:57寫道:
|
Free forum by Nabble | Edit this page |