RMQSource synchronous message ack

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

RMQSource synchronous message ack

gcandal
Hi,

Recently I've opened a Stack Overflow question about latency spikes (~500ms) after a checkpoint operation, even though the operation itself was relatively fast (~50ms).

I've come to realize that the cause for the latency was that the job was waiting for the RMQSource to acknowledgeSessionIDs during notifyCheckpointComplete.

I've noticed that the Kafka connectors do the equivalent operation (committing offsets) asynchronously, at least from 09 onwards. My question to you is: can you see any reason why does this acknowledgement have to synchronous on RabbitMQ?

I believe it should be ok, given that those messages are already reflected in the checkpointed state, but I'm not sure if there are any negatives consequences correctness-wise.

Thanks,

Reply | Threaded
Open this post in threaded view
|

Re: RMQSource synchronous message ack

Chesnay Schepler
The acknowledgement has to be synchronous since Flink assume that after notifyCheckpointComplete() all data has been persisted to external systems. For example, if record 1 to 100 were passed to the sink and a checkpoint occurs and completed, on restart Flink would continue with record 101. But if the sink does not synchronously waits for all updates to be persisted the checkpoint may finish, and if then send asynchronous update (say for record 99) then Flink will _still_ resume from record 101.

On 05.03.2019 15:07, Gabriel Candal wrote:
Hi,

Recently I've opened a Stack Overflow question about latency spikes (~500ms) after a checkpoint operation, even though the operation itself was relatively fast (~50ms).

I've come to realize that the cause for the latency was that the job was waiting for the RMQSource to acknowledgeSessionIDs during notifyCheckpointComplete.

I've noticed that the Kafka connectors do the equivalent operation (committing offsets) asynchronously, at least from 09 onwards. My question to you is: can you see any reason why does this acknowledgement have to synchronous on RabbitMQ?

I believe it should be ok, given that those messages are already reflected in the checkpointed state, but I'm not sure if there are any negatives consequences correctness-wise.

Thanks,


Reply | Threaded
Open this post in threaded view
|

Re: RMQSource synchronous message ack

gcandal
First of all, thanks for your time and quick response.

I'm not completely sure I understood your example, but is this what you
mean:

- Sink processes A, B, C
- Checkpoint persisted with A, B, C
- Notify checkpoint starts
- Notify checkpoints ACKs A
- Notify checkpoints ACKs B
- Job crashes
- Job resumes from checkpoint with A, B, C
- Job will re-process C because it was not ACK'ed

Was this it?

On a non-failure scenario (e.g.: asking the job to stop) the job wouldn't
finish before the notify is complete, even though it's asynchronous:

- Sink processes A, B, C
- Checkpoint persisted with A, B, C
- Notify checkpoint starts
- Notify checkpoints ACKs A
- Notify checkpoints ACKs B
- Job asked to stop
- Job waiting for notify to end
- Notify checkpoints ACKs C
- Job stops

This seems to be the behaviour of Kafka09Fetcher + KafkaConsumerThread, or
is there anything I'm overlooking?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/