Correctly implementing of SourceFunction.run()

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Correctly implementing of SourceFunction.run()

Senthil Kumar

I am implementing a source function which periodically wakes up and consumes data from S3.

 

My currently implementation is like so.
Following: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

 

Is it safe to simply swallow any and all exceptions in the run method and just rely on this.isRunning variable to quit the run() method?

 

Cheers

Kumar

 

---

 

@Override
public void cancel() {
   
this.isRunning = false;   // Set volatile state variable, initially set to true on Class
}

 

@Override
public void run(SourceFunction.SourceContext<OUT> ctx) {
   
while (this.isRunning) {
       
try {
           
OUT out = /* Do some work */
            ctx.collect(out);

            Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to milli seconds
       
} catch(Throwable t) {

            // Simply swallow
        }
    }
}

 

Reply | Threaded
Open this post in threaded view
|

Re: Correctly implementing of SourceFunction.run()

Jingsong Li
Hi,

Some suggestions from my side:
- synchronized (checkpointLock) to some work and ctx.collect?
- Put Thread.sleep(interval) out of try catch? Maybe should not swallow interrupt exception (Like cancel the job). 

Best,
Jingsong Lee

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar <[hidden email]> wrote:

I am implementing a source function which periodically wakes up and consumes data from S3.

 

My currently implementation is like so.
Following: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

 

Is it safe to simply swallow any and all exceptions in the run method and just rely on this.isRunning variable to quit the run() method?

 

Cheers

Kumar

 

---

 

@Override
public void cancel() {
   
this.isRunning = false;   // Set volatile state variable, initially set to true on Class
}

 

@Override
public void run(SourceFunction.SourceContext<OUT> ctx) {
   
while (this.isRunning) {
       
try {
           
OUT out = /* Do some work */
            ctx.collect(out);

            Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to milli seconds
       
} catch(Throwable t) {

            // Simply swallow
        }
    }
}

 



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Correctly implementing of SourceFunction.run()

rmetzger0
Hey Kumar,

if you are swallowing any and all exceptions, your Flink job will not fail because of issues arising from your custom source. It might make sense to stop the source if you are catching an InterruptedException.

Throwing exceptions out of the run method basically signals the Flink framework that the source has failed, and thus the job will fail / go into recovery.
The way you are using the cancel() method + isRunning variable is correct for having a proper cancellation behavior of the source.



On Fri, May 8, 2020 at 3:31 AM Jingsong Li <[hidden email]> wrote:
Hi,

Some suggestions from my side:
- synchronized (checkpointLock) to some work and ctx.collect?
- Put Thread.sleep(interval) out of try catch? Maybe should not swallow interrupt exception (Like cancel the job). 

Best,
Jingsong Lee

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar <[hidden email]> wrote:

I am implementing a source function which periodically wakes up and consumes data from S3.

 

My currently implementation is like so.
Following: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

 

Is it safe to simply swallow any and all exceptions in the run method and just rely on this.isRunning variable to quit the run() method?

 

Cheers

Kumar

 

---

 

@Override
public void cancel() {
   
this.isRunning = false;   // Set volatile state variable, initially set to true on Class
}

 

@Override
public void run(SourceFunction.SourceContext<OUT> ctx) {
   
while (this.isRunning) {
       
try {
           
OUT out = /* Do some work */
            ctx.collect(out);

            Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to milli seconds
       
} catch(Throwable t) {

            // Simply swallow
        }
    }
}

 



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Correctly implementing of SourceFunction.run()

Senthil Kumar

OK, thank you. Much appreciated.

 

Yes, I don’t want the job to fail. The source has very little data that is being pumped into a Broadcast stream.

 

From: Robert Metzger <[hidden email]>
Date: Friday, May 8, 2020 at 9:51 AM
To: Jingsong Li <[hidden email]>
Cc: Senthil Kumar <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Correctly implementing of SourceFunction.run()

 

Hey Kumar,

 

if you are swallowing any and all exceptions, your Flink job will not fail because of issues arising from your custom source. It might make sense to stop the source if you are catching an InterruptedException.

 

Throwing exceptions out of the run method basically signals the Flink framework that the source has failed, and thus the job will fail / go into recovery.

The way you are using the cancel() method + isRunning variable is correct for having a proper cancellation behavior of the source.

 

 

 

On Fri, May 8, 2020 at 3:31 AM Jingsong Li <[hidden email]> wrote:

Hi,

 

Some suggestions from my side:

- synchronized (checkpointLock) to some work and ctx.collect?

- Put Thread.sleep(interval) out of try catch? Maybe should not swallow interrupt exception (Like cancel the job). 

 

Best,

Jingsong Lee

 

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar <[hidden email]> wrote:

I am implementing a source function which periodically wakes up and consumes data from S3.

 

My currently implementation is like so.
Following: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

 

Is it safe to simply swallow any and all exceptions in the run method and just rely on this.isRunning variable to quit the run() method?

 

Cheers

Kumar

 

---

 

@Override
public void cancel() {
   
this.isRunning = false;   // Set volatile state variable, initially set to true on Class
}

 

@Override
public void run(SourceFunction.SourceContext<OUT> ctx) {
   
while (this.isRunning) {
       
try {
           
OUT out = /* Do some work */
            ctx.collect(out);

            Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to milli seconds
       
} catch(Throwable t) {

            // Simply swallow
        }
    }
}

 


 

--

Best, Jingsong Lee