Hi, For a Flink batch job, some value are writing to Kafka through a Producer. I want to register a hook for closing (at the end) the Kafka producer a worker is using.... hook to be executed, of course, on worker side. Is there a way to do so ? Thanks. Regards, Dominique |
Hi, Flink's ProcessFunction has a close() method, which is executed on shutdown of the workers. (You could also use any of the Rich* functions for that purpose). If you add a ProcessFunction with the same parallelism before the KafkaSink, it'll be executed on the same machines as the Kafka producer. Afaik, the close() call should not take forever, as the system might interrupt your thread if it doesn't finish closing on time (30s is the default for "cluster.services.shutdown-timeout") Best, Robert On Tue, Jan 21, 2020 at 10:02 AM Dominique De Vito <[hidden email]> wrote:
|
Hi Robert, Thanks for your hint / reply / help. So far I have not tested your way (may be next), but tried another one: * use mapPartitions -- at the beginning, get a KafkaProducer -- the KafkaProducerFactory class I use is lazy and caches the first instances created; so, there is reuse. * register a JVM hook for closing KafkaProducer. So far I have met some perf issue, but I don't know yet it's due to my pattern, or something else. Anyway, thanks. Regards, Dominique Le ven. 31 janv. 2020 à 14:20, Robert Metzger <[hidden email]> a écrit :
|
Hi, I now realize that you are using the batch API, and I gave you an answer for the streaming API :( The mapPartition function also has a close() method, which you can use to implement the same pattern. With a JVM Shutdown hook, you are assuming that the TaskManager is shutting down at the end of your job. That this happens depends on the deployment method. You might have unexpected results if you are deploying Flink in a way that keeps the TaskManagers around. On Tue, Feb 18, 2020 at 1:34 PM Dominique De Vito <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |