Hi there,
We've started to witness ConsumerCancelledException errors from our RabbitMQ source. We've digged in everywhere, yet couldn't come up with a good explanation.
This is the exception:
com.rabbitmq.client.ConsumerCancelledException
at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:208)
at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:223)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.run(RMQSource.java:193)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ConsumerCancelledException
at com.rabbitmq.client.QueueingConsumer.handleCancel(QueueingConsumer.java:122)
at com.rabbitmq.client.impl.ConsumerDispatcher$3.run(ConsumerDispatcher.java:115)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
We've tried limiting prefetch count 100 and 500, didn't change. We can try 1 by 1, but that doesn't really sound efficient.
Is anyone familiar with possible causes?
The World's Fastest Human Translation Platform.