Freeing resources in SourceFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Freeing resources in SourceFunction

Yury Ruchin
Hello,

I'm writing a custom source function for my streaming job. The source function manages some connection pool. I want to close that pool once my job is "finished" (since the stream is unbounded, the only way I see is to cancel the streaming job). Since I inherit RichSourceFunction, there are two candidates: cancel() and close(). I'm wondering which one should be picked. Looking for best practices, I resorted to the existing sources. One example is FlinkKafkaConsumerBase which has both callbacks implemented identically (one delegating to the other). Counterexample is InputFormatSourceFunction which uses cancel() only to reset flag, while actual cleanup is done in close(). Which of these approaches is a better fit in the described case?

Just FYI, Flink version I use is 1.1.2.

Thanks,
Yury
Reply | Threaded
Open this post in threaded view
|

Re: Freeing resources in SourceFunction

Maximilian Michels
For your use case you should use the close() method which is always
called upon shutdown of your source. The cancel() is only called when
you explicitly cancel your job.

-Max


On Thu, Nov 3, 2016 at 2:45 PM, Yury Ruchin <[hidden email]> wrote:

> Hello,
>
> I'm writing a custom source function for my streaming job. The source
> function manages some connection pool. I want to close that pool once my job
> is "finished" (since the stream is unbounded, the only way I see is to
> cancel the streaming job). Since I inherit RichSourceFunction, there are two
> candidates: cancel() and close(). I'm wondering which one should be picked.
> Looking for best practices, I resorted to the existing sources. One example
> is FlinkKafkaConsumerBase which has both callbacks implemented identically
> (one delegating to the other). Counterexample is InputFormatSourceFunction
> which uses cancel() only to reset flag, while actual cleanup is done in
> close(). Which of these approaches is a better fit in the described case?
>
> Just FYI, Flink version I use is 1.1.2.
>
> Thanks,
> Yury