ProcessFunction collect and close, when to use?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

ProcessFunction collect and close, when to use?

shuwen zhou
Hi Community,
In ProcessFunction class, ProcessElement function, there is a Collector that has 2 method: collect() and close(). I would like to know:

1. When to call close() ? After every element processed? Or on ProcessFunction.close() ? Or never to use it? If it's been closed already, can the collector collect() anymore data?
2. If processElement received a message but consider it as discard and does not call collect(), will this block checkpoint's barrier until next element was sent by collect() ?


--
Best Wishes,
Shuwen Zhou

Reply | Threaded
Open this post in threaded view
|

Re: ProcessFunction collect and close, when to use?

bupt_ljy

Hi Shuwen,


When to call close() ? After every element processed? Or on ProcessFunction.close() ? Or never to use it? 


IMO, the #close() function is used to manage the lifecycle of #Collector instead of a single element. I think it should not be called in user function unless you have some special use cases(no use case comes to my mind).


If it's been closed already, can the collector collect() anymore data?


No. if it’s closed, it usually means the writer is closed or maybe the operator is closed.


If processElement received a message but consider it as discard and does not call collect(), will this block checkpoint's barrier until next element was sent by collect() ?


No. 



Best,

Jiayi Liao



 Original Message 
Sender: shuwen zhou<[hidden email]>
Recipient: user<[hidden email]>
Date: Friday, Nov 29, 2019 12:29
Subject: ProcessFunction collect and close, when to use?

Hi Community,
In ProcessFunction class, ProcessElement function, there is a Collector that has 2 method: collect() and close(). I would like to know:

1. When to call close() ? After every element processed? Or on ProcessFunction.close() ? Or never to use it? If it's been closed already, can the collector collect() anymore data?
2. If processElement received a message but consider it as discard and does not call collect(), will this block checkpoint's barrier until next element was sent by collect() ?


--
Best Wishes,
Shuwen Zhou

Reply | Threaded
Open this post in threaded view
|

Re: ProcessFunction collect and close, when to use?

shuwen zhou
Thank you Jiayi, that helps a lot!

On Fri, 29 Nov 2019 at 13:44, bupt_ljy <[hidden email]> wrote:

Hi Shuwen,


When to call close() ? After every element processed? Or on ProcessFunction.close() ? Or never to use it? 


IMO, the #close() function is used to manage the lifecycle of #Collector instead of a single element. I think it should not be called in user function unless you have some special use cases(no use case comes to my mind).


If it's been closed already, can the collector collect() anymore data?


No. if it’s closed, it usually means the writer is closed or maybe the operator is closed.


If processElement received a message but consider it as discard and does not call collect(), will this block checkpoint's barrier until next element was sent by collect() ?


No. 



Best,

Jiayi Liao



 Original Message 
Sender: shuwen zhou<[hidden email]>
Recipient: user<[hidden email]>
Date: Friday, Nov 29, 2019 12:29
Subject: ProcessFunction collect and close, when to use?

Hi Community,
In ProcessFunction class, ProcessElement function, there is a Collector that has 2 method: collect() and close(). I would like to know:

1. When to call close() ? After every element processed? Or on ProcessFunction.close() ? Or never to use it? If it's been closed already, can the collector collect() anymore data?
2. If processElement received a message but consider it as discard and does not call collect(), will this block checkpoint's barrier until next element was sent by collect() ?


--
Best Wishes,
Shuwen Zhou



--
Best Wishes,
Shuwen Zhou

Reply | Threaded
Open this post in threaded view
|

Re: ProcessFunction collect and close, when to use?

Chesnay Schepler
In reply to this post by shuwen zhou
1) You should never call close() on the collector; Flink will do that automatically.
2) No, it shouldn't block anything. Flink will look at the next record to process, notice it's a barrier and pass it on immediately.

On 29/11/2019 05:29, shuwen zhou wrote:
Hi Community,
In ProcessFunction class, ProcessElement function, there is a Collector that has 2 method: collect() and close(). I would like to know:

1. When to call close() ? After every element processed? Or on ProcessFunction.close() ? Or never to use it? If it's been closed already, can the collector collect() anymore data?
2. If processElement received a message but consider it as discard and does not call collect(), will this block checkpoint's barrier until next element was sent by collect() ?


--
Best Wishes,
Shuwen Zhou