Hi Kostas,
Thanks for pointing me in the right direction. I have gone and extended MessageAcknowledgingSourceBase. It was quite easy to do. I have however some follow-up questions about the guarantees it gives and testing my solution. 1. Guarantees: Questions: a. When the acknowledgeIDs method is called, is it certain that all the rest of the operators, including the Sinks finished successfully? E.g: If I have a sink that writes to MySQL/Cassandra and one that writes to SQS/Kafka, will the writes to both of these systems have been completed successfully before acknowledgeIDs is called? b. Messages can be duplicated in case the processing takes longer than the queue timeout or if there are failures and Flink needs to recover. This is a problem for sinks that write to non-idempotent systems e.g. SQS, Kafka. What are the recommended approaches to avoid this duplication? Are there any example repos? 2. Testing: Work done so far: In order to convince myself that indeed this source is reliable, I wrote some integration tests. I used LocalFlinkMiniCluster which is quite nice. However when I tried to test what happens when I kill the TaskManager running the task that is executing my MessageAcknowledgingSourceBase I found it not so straight forward. I managed to get around it by starting the cluster and the job, getting all the task manager actor references, adding a new task manager to the cluster, killing the initial task managers by sending a poison pill actor msg. I had to kill all the initial task managers as I did not find a way to get a mapping between task running the source and the task manager actor to which it got assigned. Questions: a. Is there some better api for fine grained killing of various services, tasks, resources in a Flink cluster or job? b. Can you point me to a repo with reliability tests for Flink i.e. where things are killed to see whether the system recovers etc? Thanks, M On Tue, Apr 25, 2017 at 9:23 AM, Kostas Kloudas <[hidden email]> wrote: Hi Martin! |
Hi Martin! Let me try to follow-up some of your questions :)
That is correct. The `MessageAcknowledgingSourceBase#acknowledgeIDs` is basically wrapped within a `notifyCheckpointComplete()` call. Checkpoints are only notified to be completed when all sinks have finished their snapshot for the checkpoint. For sinks like Cassandra and Kafka, currently what the snapshot method does is flush all in-flight pending requests to write to the external system. So yes, you can be sure that the writes for the acknowledged IDs have been completed by all sinks.
Duplication is a general problem for non-idempotent pipeline setups, and is not easy to avoid when the external sink does not support transactions. I’m not aware of any cookbook solutions for this.
The Flink Kafka consumer actually has quite a few tests for exactly-once guarantees. You can take a look at them here [1]. Specifically, take a look at the `testOneToOneSources`, `testOneSourceMultiplePartitions`, etc. tests. I think they are quite good examples of how to write tests for exactly-once testing. - Gordon On 8 May 2017 at 1:08:44 PM, Martin Eden ([hidden email]) wrote:
Free forum by Nabble | Edit this page |