Strange behaviour when using RMQSource in Flink 1.11.2

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Strange behaviour when using RMQSource in Flink 1.11.2

Thomas Eckestad
Hi,

we are using the RabbitMQ source connector with exactly-once guarantees. For this to work, according to the official Flink documentation, we are supplying correlation IDs with each published message and we use a parallelism of one with the Flink job being the single/only consumer of the queue in question (and we have enabled checkpointing).

The following behavior by the RMQSource seems strange to us. When a job is restarted from a checkpoint and there are unacked messages on the RabbitMQ queue for messages processed in the previous checkpoint interval, those messages will stay unacked until the job either finishes or is restarted again. When the connection to RabbitMQ is later closed (the job finished or is restarted), the unacked messages will be requeued for resend and sent when the next connection is established. 

When looking at the source code, messages are ACK:ed by the RMQSource after a checkpoint is complete (MessageAcknowledgingSourceBase::notifyCheckpointComplete).

Also, when looking at the source code in RMQSource::setMessageIdentifier() (on the master branch, the ACK semantics does not seem to have changed since 1.11.2) it is clear that if a RMQ message carries a correlation ID which has already been handled that message is skipped and not further processed. It is also clear that skipped messages are not added to the sessionIds-list of messages that are targeted for ACK to RMQ. I believe all successfully consumed RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or processed by Flink. RMQ needs to know that the consumer considers the message as handled OK.

The following code is from RMQSource::setMessageIdentifier(). Note the return before sessionIds.add():
.
.
.
  if (!addId(correlationId)) {
    // we have already processed this message
    return false;
  }
}
sessionIds.add(deliveryTag);
.
.
.

Directly related to the above I also notice that RMQ connections are leaked at internal job restart. From the Flink log (this stack trace is from 1.11.2):

2020-11-18 10:08:25,118 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Error during disposal of stream operator.
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]

AlreadyClosedException is not handled by the RMQSource::close(). This results in a RMQ connection thread somewhere being left behind. I triggered three restarts of the job in a row and noticed one new connection added to the pile of connections for each restart. I triggered the restart by killing the active connection to RMQ using the RMQ admin GUI (management plugin, see above exception details).

I also tried to kill one of the leaked connections. But a new one is instantly created when doing so. The traceback when doing this (1.11.2):

2020-11-18 10:27:51,715 ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler           [] - An unexpected connection driver error occured
java.lang.NoClassDefFoundError: com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at java.lang.Thread.run(Unknown Source) [?:?]

I have verified that com/rabbitmq/client/AMQP$Connection$CloseOk$Builder is included in the Job-jar:

less <my-flink-jar> | egrep 'AMQP\$Connection\$CloseOk\$Builder'
-rw----     2.0 fat      818 bl defN 20-Nov-11 16:17 com/rabbitmq/client/AMQP$Connection$CloseOk$Builder.class

So, to sum up. It looks like there is a bug regarding ACK:s when using correlation IDs. This will break the exactly-once guarantee of the RMQSource since unacked messages will be requeued after re-connect to RMQ and thus might be processed more than once.

Also, the clean-up logic of the RMQSource seems buggy. 

Does my reasoning make sense to you?

Best Regards,
Thomas Eckestad

Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour when using RMQSource in Flink 1.11.2

Andrey Zagrebin-5
Hi Thomas,

I am not an expert on RMQSource connector but your concerns look valid.
Could you file a Jira issue in Flink issue tracker? [1]

I cannot immediately refer to a committer who could help with this but let's hope that the issue gets attention.
If you want to contribute an improvement for this in Flink, you can write your suggestion there as well
and once there is positive feedback from a committer, a github PR can be opened.

Best,
Andrey



On Wed, Nov 18, 2020 at 3:49 PM Thomas Eckestad <[hidden email]> wrote:
Hi,

we are using the RabbitMQ source connector with exactly-once guarantees. For this to work, according to the official Flink documentation, we are supplying correlation IDs with each published message and we use a parallelism of one with the Flink job being the single/only consumer of the queue in question (and we have enabled checkpointing).

The following behavior by the RMQSource seems strange to us. When a job is restarted from a checkpoint and there are unacked messages on the RabbitMQ queue for messages processed in the previous checkpoint interval, those messages will stay unacked until the job either finishes or is restarted again. When the connection to RabbitMQ is later closed (the job finished or is restarted), the unacked messages will be requeued for resend and sent when the next connection is established. 

When looking at the source code, messages are ACK:ed by the RMQSource after a checkpoint is complete (MessageAcknowledgingSourceBase::notifyCheckpointComplete).

Also, when looking at the source code in RMQSource::setMessageIdentifier() (on the master branch, the ACK semantics does not seem to have changed since 1.11.2) it is clear that if a RMQ message carries a correlation ID which has already been handled that message is skipped and not further processed. It is also clear that skipped messages are not added to the sessionIds-list of messages that are targeted for ACK to RMQ. I believe all successfully consumed RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or processed by Flink. RMQ needs to know that the consumer considers the message as handled OK.

The following code is from RMQSource::setMessageIdentifier(). Note the return before sessionIds.add():
.
.
.
  if (!addId(correlationId)) {
    // we have already processed this message
    return false;
  }
}
sessionIds.add(deliveryTag);
.
.
.

Directly related to the above I also notice that RMQ connections are leaked at internal job restart. From the Flink log (this stack trace is from 1.11.2):

2020-11-18 10:08:25,118 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Error during disposal of stream operator.
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]

AlreadyClosedException is not handled by the RMQSource::close(). This results in a RMQ connection thread somewhere being left behind. I triggered three restarts of the job in a row and noticed one new connection added to the pile of connections for each restart. I triggered the restart by killing the active connection to RMQ using the RMQ admin GUI (management plugin, see above exception details).

I also tried to kill one of the leaked connections. But a new one is instantly created when doing so. The traceback when doing this (1.11.2):

2020-11-18 10:27:51,715 ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler           [] - An unexpected connection driver error occured
java.lang.NoClassDefFoundError: com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at java.lang.Thread.run(Unknown Source) [?:?]

I have verified that com/rabbitmq/client/AMQP$Connection$CloseOk$Builder is included in the Job-jar:

less <my-flink-jar> | egrep 'AMQP\$Connection\$CloseOk\$Builder'
-rw----     2.0 fat      818 bl defN 20-Nov-11 16:17 com/rabbitmq/client/AMQP$Connection$CloseOk$Builder.class

So, to sum up. It looks like there is a bug regarding ACK:s when using correlation IDs. This will break the exactly-once guarantee of the RMQSource since unacked messages will be requeued after re-connect to RMQ and thus might be processed more than once.

Also, the clean-up logic of the RMQSource seems buggy. 

Does my reasoning make sense to you?

Best Regards,
Thomas Eckestad

Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour when using RMQSource in Flink 1.11.2

Thomas Eckestad
Hi Andrey,

Thank you for your response. I created https://issues.apache.org/jira/browse/FLINK-20244.

Best Regards,
Thomas

From: Andrey Zagrebin <[hidden email]>
Sent: Thursday, November 19, 2020 8:41
To: Thomas Eckestad <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Strange behaviour when using RMQSource in Flink 1.11.2
 
Hi Thomas,

I am not an expert on RMQSource connector but your concerns look valid.
Could you file a Jira issue in Flink issue tracker? [1]

I cannot immediately refer to a committer who could help with this but let's hope that the issue gets attention.
If you want to contribute an improvement for this in Flink, you can write your suggestion there as well
and once there is positive feedback from a committer, a github PR can be opened.

Best,
Andrey



On Wed, Nov 18, 2020 at 3:49 PM Thomas Eckestad <[hidden email]> wrote:
Hi,

we are using the RabbitMQ source connector with exactly-once guarantees. For this to work, according to the official Flink documentation, we are supplying correlation IDs with each published message and we use a parallelism of one with the Flink job being the single/only consumer of the queue in question (and we have enabled checkpointing).

The following behavior by the RMQSource seems strange to us. When a job is restarted from a checkpoint and there are unacked messages on the RabbitMQ queue for messages processed in the previous checkpoint interval, those messages will stay unacked until the job either finishes or is restarted again. When the connection to RabbitMQ is later closed (the job finished or is restarted), the unacked messages will be requeued for resend and sent when the next connection is established. 

When looking at the source code, messages are ACK:ed by the RMQSource after a checkpoint is complete (MessageAcknowledgingSourceBase::notifyCheckpointComplete).

Also, when looking at the source code in RMQSource::setMessageIdentifier() (on the master branch, the ACK semantics does not seem to have changed since 1.11.2) it is clear that if a RMQ message carries a correlation ID which has already been handled that message is skipped and not further processed. It is also clear that skipped messages are not added to the sessionIds-list of messages that are targeted for ACK to RMQ. I believe all successfully consumed RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or processed by Flink. RMQ needs to know that the consumer considers the message as handled OK.

The following code is from RMQSource::setMessageIdentifier(). Note the return before sessionIds.add():
.
.
.
  if (!addId(correlationId)) {
    // we have already processed this message
    return false;
  }
}
sessionIds.add(deliveryTag);
.
.
.

Directly related to the above I also notice that RMQ connections are leaked at internal job restart. From the Flink log (this stack trace is from 1.11.2):

2020-11-18 10:08:25,118 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Error during disposal of stream operator.
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]

AlreadyClosedException is not handled by the RMQSource::close(). This results in a RMQ connection thread somewhere being left behind. I triggered three restarts of the job in a row and noticed one new connection added to the pile of connections for each restart. I triggered the restart by killing the active connection to RMQ using the RMQ admin GUI (management plugin, see above exception details).

I also tried to kill one of the leaked connections. But a new one is instantly created when doing so. The traceback when doing this (1.11.2):

2020-11-18 10:27:51,715 ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler           [] - An unexpected connection driver error occured
java.lang.NoClassDefFoundError: com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at java.lang.Thread.run(Unknown Source) [?:?]

I have verified that com/rabbitmq/client/AMQP$Connection$CloseOk$Builder is included in the Job-jar:

less <my-flink-jar> | egrep 'AMQP\$Connection\$CloseOk\$Builder'
-rw----     2.0 fat      818 bl defN 20-Nov-11 16:17 com/rabbitmq/client/AMQP$Connection$CloseOk$Builder.class

So, to sum up. It looks like there is a bug regarding ACK:s when using correlation IDs. This will break the exactly-once guarantee of the RMQSource since unacked messages will be requeued after re-connect to RMQ and thus might be processed more than once.

Also, the clean-up logic of the RMQSource seems buggy. 

Does my reasoning make sense to you?

Best Regards,
Thomas Eckestad