RabbitMQ source does not stop unless message arrives in queue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

RabbitMQ source does not stop unless message arrives in queue

jose-pvargas
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's RabbitMQ source has some surprising behavior when a stop-with-savepoint request is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a message arrives in all the queues that the job consumes from after the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the queues consumed by the job that the deserialization schema checks in its isEndOfStream method. However, this is somewhat cumbersome and complicates the continuous delivery of a Flink job. For example, Ververica Platform will trigger a stop-with-savepoint for the user if one of many possible Flink configurations for a job are changed. The stop-with-savepoint can then hang indefinitely because only some of the RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint request was made. Most every thread is either sleeping or waiting around for locks to be released, and then there are a handful of threads trying to read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read data from RabbitMQ would be interrupted so that all RabbitMQ sources would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: [hidden email]


fiscalnote.com  |  info.cq.com  | rollcall.com



taskmanager_thread_dump.json (72K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: RabbitMQ source does not stop unless message arrives in queue

austin.ce
Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing work to update the RMQ source to the new interface, which might address some of these issues (or should, if it is not already), tracked in FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would you like me to?

At my previous company, we only consumed one Rabbit queue per application, so we didn't run into this exactly but did see other weird behavior in the RMQ source that could be related. I'm going to cc [hidden email] who might be able to contribute to what he's seen working with the source, if he's around. I remember some messages not properly being ack'ed during a stateful shutdown via the Ververica Platform's stop-with-savepoint functionality that you mention, though that might be more related to FLINK-20244[2], perhaps.


Best,
Austin


On Thu, May 13, 2021 at 10:23 AM Jose Vargas <[hidden email]> wrote:
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's RabbitMQ source has some surprising behavior when a stop-with-savepoint request is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a message arrives in all the queues that the job consumes from after the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the queues consumed by the job that the deserialization schema checks in its isEndOfStream method. However, this is somewhat cumbersome and complicates the continuous delivery of a Flink job. For example, Ververica Platform will trigger a stop-with-savepoint for the user if one of many possible Flink configurations for a job are changed. The stop-with-savepoint can then hang indefinitely because only some of the RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint request was made. Most every thread is either sleeping or waiting around for locks to be released, and then there are a handful of threads trying to read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read data from RabbitMQ would be interrupted so that all RabbitMQ sources would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: [hidden email]


fiscalnote.com  |  info.cq.com  | rollcall.com


Reply | Threaded
Open this post in threaded view
|

Re: RabbitMQ source does not stop unless message arrives in queue

John Morrow
Hi Jose, hey Austin!!

I know we were just recently looking at trying to consume a fixed number of messages from an RMQ source, process them and output them to an RMQ sink. As a naive first attempt at stopping the job when the target number of messaged had been processed, we put a counter state in the process function and tried throwing an exception when the counter >= the target message count.

The job had:
  • parallelism: 1
  • checkpointing: 1000 (1 sec)
  • restartStrategy: noRestart
  • prefetchCount: 100
Running it with 150 messages in the input queue and 150 also as the target number, at the end the queues had:
  • output queue - 150
  • input queue - 50
So it looks like it did transfer all the messages, but some unack'd ones also got requeued back at the source so end up as duplicates. I know throwing an exception in the Flink job is not the same as triggering a stateful shutdown, but it might be hitting similar unack issues.

John


From: Austin Cawley-Edwards <[hidden email]>
Sent: Thursday 13 May 2021 16:49
To: Jose Vargas <[hidden email]>; John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: RabbitMQ source does not stop unless message arrives in queue
 
Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing work to update the RMQ source to the new interface, which might address some of these issues (or should, if it is not already), tracked in FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would you like me to?

At my previous company, we only consumed one Rabbit queue per application, so we didn't run into this exactly but did see other weird behavior in the RMQ source that could be related. I'm going to cc [hidden email] who might be able to contribute to what he's seen working with the source, if he's around. I remember some messages not properly being ack'ed during a stateful shutdown via the Ververica Platform's stop-with-savepoint functionality that you mention, though that might be more related to FLINK-20244[2], perhaps.


Best,
Austin


On Thu, May 13, 2021 at 10:23 AM Jose Vargas <[hidden email]> wrote:
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's RabbitMQ source has some surprising behavior when a stop-with-savepoint request is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a message arrives in all the queues that the job consumes from after the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the queues consumed by the job that the deserialization schema checks in its isEndOfStream method. However, this is somewhat cumbersome and complicates the continuous delivery of a Flink job. For example, Ververica Platform will trigger a stop-with-savepoint for the user if one of many possible Flink configurations for a job are changed. The stop-with-savepoint can then hang indefinitely because only some of the RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint request was made. Most every thread is either sleeping or waiting around for locks to be released, and then there are a handful of threads trying to read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read data from RabbitMQ would be interrupted so that all RabbitMQ sources would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: [hidden email]


fiscalnote.com  |  info.cq.com  | rollcall.com


Reply | Threaded
Open this post in threaded view
|

Re: RabbitMQ source does not stop unless message arrives in queue

austin.ce
Hey all,

Thanks for the details, John! Hmm, that doesn't look too good either 😬 but probably a different issue with the RMQ source/ sink. Hopefully, the new FLIP-27 sources will help you guys out there! The upcoming HybridSource in FLIP-150 [1] might also be interesting to you in finely controlling sources.

[hidden email] I've created FLINK-22698 [2] to track your issue. Do you have a small reproducible case/ GitHub repo? Also, would you be able to provide a little bit more about the Flink job that you see this issue in? i.e. overall parallelism, the parallelism of the sources/ sinks, checkpointing mode.

Best,
Austin


On Thu, May 13, 2021 at 9:25 PM John Morrow <[hidden email]> wrote:
Hi Jose, hey Austin!!

I know we were just recently looking at trying to consume a fixed number of messages from an RMQ source, process them and output them to an RMQ sink. As a naive first attempt at stopping the job when the target number of messaged had been processed, we put a counter state in the process function and tried throwing an exception when the counter >= the target message count.

The job had:
  • parallelism: 1
  • checkpointing: 1000 (1 sec)
  • restartStrategy: noRestart
  • prefetchCount: 100
Running it with 150 messages in the input queue and 150 also as the target number, at the end the queues had:
  • output queue - 150
  • input queue - 50
So it looks like it did transfer all the messages, but some unack'd ones also got requeued back at the source so end up as duplicates. I know throwing an exception in the Flink job is not the same as triggering a stateful shutdown, but it might be hitting similar unack issues.

John


From: Austin Cawley-Edwards <[hidden email]>
Sent: Thursday 13 May 2021 16:49
To: Jose Vargas <[hidden email]>; John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: RabbitMQ source does not stop unless message arrives in queue
 
Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing work to update the RMQ source to the new interface, which might address some of these issues (or should, if it is not already), tracked in FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would you like me to?

At my previous company, we only consumed one Rabbit queue per application, so we didn't run into this exactly but did see other weird behavior in the RMQ source that could be related. I'm going to cc [hidden email] who might be able to contribute to what he's seen working with the source, if he's around. I remember some messages not properly being ack'ed during a stateful shutdown via the Ververica Platform's stop-with-savepoint functionality that you mention, though that might be more related to FLINK-20244[2], perhaps.


Best,
Austin


On Thu, May 13, 2021 at 10:23 AM Jose Vargas <[hidden email]> wrote:
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's RabbitMQ source has some surprising behavior when a stop-with-savepoint request is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a message arrives in all the queues that the job consumes from after the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the queues consumed by the job that the deserialization schema checks in its isEndOfStream method. However, this is somewhat cumbersome and complicates the continuous delivery of a Flink job. For example, Ververica Platform will trigger a stop-with-savepoint for the user if one of many possible Flink configurations for a job are changed. The stop-with-savepoint can then hang indefinitely because only some of the RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint request was made. Most every thread is either sleeping or waiting around for locks to be released, and then there are a handful of threads trying to read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read data from RabbitMQ would be interrupted so that all RabbitMQ sources would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: [hidden email]


fiscalnote.com  |  info.cq.com  | rollcall.com


Reply | Threaded
Open this post in threaded view
|

Re: RabbitMQ source does not stop unless message arrives in queue

jose-pvargas
Hi all,

Apologies for not following up sooner. Thank you Austin for creating FLINK-22698. It seems that the issue is well understood and a fix is currently under development/review. Please let me know if there is anything additional that I can do. I look forward to testing out a new version of Flink that includes this fix.

Thanks again,
Jose

On Tue, May 18, 2021 at 4:38 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

Thanks for the details, John! Hmm, that doesn't look too good either 😬 but probably a different issue with the RMQ source/ sink. Hopefully, the new FLIP-27 sources will help you guys out there! The upcoming HybridSource in FLIP-150 [1] might also be interesting to you in finely controlling sources.

[hidden email] I've created FLINK-22698 [2] to track your issue. Do you have a small reproducible case/ GitHub repo? Also, would you be able to provide a little bit more about the Flink job that you see this issue in? i.e. overall parallelism, the parallelism of the sources/ sinks, checkpointing mode.

Best,
Austin


On Thu, May 13, 2021 at 9:25 PM John Morrow <[hidden email]> wrote:
Hi Jose, hey Austin!!

I know we were just recently looking at trying to consume a fixed number of messages from an RMQ source, process them and output them to an RMQ sink. As a naive first attempt at stopping the job when the target number of messaged had been processed, we put a counter state in the process function and tried throwing an exception when the counter >= the target message count.

The job had:
  • parallelism: 1
  • checkpointing: 1000 (1 sec)
  • restartStrategy: noRestart
  • prefetchCount: 100
Running it with 150 messages in the input queue and 150 also as the target number, at the end the queues had:
  • output queue - 150
  • input queue - 50
So it looks like it did transfer all the messages, but some unack'd ones also got requeued back at the source so end up as duplicates. I know throwing an exception in the Flink job is not the same as triggering a stateful shutdown, but it might be hitting similar unack issues.

John


From: Austin Cawley-Edwards <[hidden email]>
Sent: Thursday 13 May 2021 16:49
To: Jose Vargas <[hidden email]>; John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: RabbitMQ source does not stop unless message arrives in queue
 
Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing work to update the RMQ source to the new interface, which might address some of these issues (or should, if it is not already), tracked in FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would you like me to?

At my previous company, we only consumed one Rabbit queue per application, so we didn't run into this exactly but did see other weird behavior in the RMQ source that could be related. I'm going to cc [hidden email] who might be able to contribute to what he's seen working with the source, if he's around. I remember some messages not properly being ack'ed during a stateful shutdown via the Ververica Platform's stop-with-savepoint functionality that you mention, though that might be more related to FLINK-20244[2], perhaps.


Best,
Austin


On Thu, May 13, 2021 at 10:23 AM Jose Vargas <[hidden email]> wrote:
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's RabbitMQ source has some surprising behavior when a stop-with-savepoint request is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a message arrives in all the queues that the job consumes from after the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the queues consumed by the job that the deserialization schema checks in its isEndOfStream method. However, this is somewhat cumbersome and complicates the continuous delivery of a Flink job. For example, Ververica Platform will trigger a stop-with-savepoint for the user if one of many possible Flink configurations for a job are changed. The stop-with-savepoint can then hang indefinitely because only some of the RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint request was made. Most every thread is either sleeping or waiting around for locks to be released, and then there are a handful of threads trying to read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read data from RabbitMQ would be interrupted so that all RabbitMQ sources would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: [hidden email]


fiscalnote.com  |  info.cq.com  | rollcall.com




--

Jose Vargas

Software Engineer, Data Engineering

E: [hidden email]


fiscalnote.com  |  info.cq.com  | rollcall.com