I am correct in assuming that the Kafka producer sink can lose message?
I don't expect exactly-once semantics using Kafka as a sink given Kafka publishing guarantees, but I do expect at least once. I gather from reading the source that the producer is publishing messages asynchronously, as expected. And that a callback can record publishing errors, which will be raised when detected. But as far as I can tell, there is no barrier to wait for async errors from the sink when checkpointing. Did I miss it? To do so you'd have to call the flush() method in KafkaProducer, which would flush and block until all penny requests succeeded or failed. Given that FlinkKafkaProducer09 is just a SinkFuntion, with a simple invoke() method, there doesn't appear to be a way to ensure the sink has published all pending writes successfully. To it seems like if a checkpoint occurs while there are pending publish requests, and the requests return a failure after the checkpoint occurred, those message will be lost as the checkpoint will consider them processed by the sink. Seems as if there is an expectation that SinkFunction is synchronous. Maybe there is a need for a AsyncSinkFunction interface with a method to block until messages are flushed, or the Sink should keep track what messages have been successfully published so that the information can be used by the checkpointing system. |
You raised a good point. Fortunately, there should be a simply way to fix this. The Kafka Sunk Function should implement the "Checkpointed" interface. It will get a call to the "snapshotState()" method whenever a checkpoint happens. Inside that call, it should then sync on the callbacks, and only return once all have completed. It may return null (no need to store anything in the checkpoint). While the "Checkpointed" method has not returned, the checkpoint will not complete. That way, there will be a "synchronization" point per checkpoint. We can even improve this further in the future: The checkpoint method can return an async state handle. While the async state handle completes its "wait for callbacks" in the background (and only acks the checkpoint after that has complete), the sink function can continue processing. What do you think? Stephan On Sat, Jun 4, 2016 at 4:05 AM, Elias Levy <[hidden email]> wrote:
|
On Sun, Jun 5, 2016 at 3:16 PM, Stephan Ewen <[hidden email]> wrote:
I opened FLINK-4027 to track the issue. That seems like an acceptable solution. Presumably an exception can be raised in snapshotState() if there is a Kafka publishing error when calling flush() on the Kafka producer, which will cause the checkpoint to fail. I do wonder what sort of performance penalty using flush() will incur, as it is a synchronous call. I assume no other messages can be processed by the sink while inside snapshotState(). In theory a sink could continue processing messages, so long as it kept track of pending messages that occurred before the barrier and responded to the snapshotState() call when there no longer were any pending messages from before the barrier. |
Hi Elias! The concern you raised about the sink being synchronous is exactly what my last suggestion should address: The internal state backends can return a handle that can do the sync in a background thread. The sink would continue processing messages, and the checkpoint would only be acknowledged after the background sync did complete. We should allow user code to return such a handle as well. We have to think about implications concerning message order, though... Greetings, Stephan On Mon, Jun 6, 2016 at 11:58 PM, Elias Levy <[hidden email]> wrote:
|
On Tue, Jun 7, 2016 at 4:52 AM, Stephan Ewen <[hidden email]> wrote:
Sorry. Apparently I hadn't had enough coffee and completely missed the last paragraph of your response. The async solution you propose seems ideal. What message ordering guarantees are you worried about? I don't think you can do much about guaranteeing message ordering within Kafka in case of failure, and you'll replay some messages. And there isn't any guarantee if you are writing to a Kafka topic with multiple partitions from multiple sinks using a message key distinct from the key you used in a keyBy in Flink, as you'll be writing from multiple sink instances in parallel in what is essentially a shuffle. It would seem the only ordering guarantee is if you write from a sink into a Kafka topic using a message key that is the same as the key used in a keyBy in Flink, and even that will be violated during a failure and replay by the sink. |
Free forum by Nabble | Edit this page |