Apache Flink Kafka Stream get all messages and stop

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink Kafka Stream get all messages and stop

k.leshakov@yandex.ru
Hello everyone!
 
 
I want to do: Using Flink DataStream API, create a Kafka consumer, get all messages from the topic up to the current moment, stop consumer (the main problem is with stopping).
 
However, I have no ideas on how to do that. Operating on DataStream, I am able to pass as a source consumer, but how to stop it when all messages are consumed? (i.e. consumer should not idle). One of my ideas was to make a timeout function which I pass to streamenv.process. However, it did not work out as my consumer was not closed(it is described in more detail on the SO).
 
Is there any other solution? Or should I continue trying with TimeoutFunction? Could you please help? Thanks in advance!