Checkpointing when reading from files?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpointing when reading from files?

NEKRASSOV, ALEXEI

I want to add checkpointing to my program that reads from a set of files in a directory. Without checkpointing I use readFile():

 

              DataStream<String> text = env.readFile(

                           new TextInputFormat(new Path(inputPath)),

                           inputPath,

                          inputProcessingMode,

                          1000);

 

Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator to add checkpointing? Or is there an easier way?

 

How do I go from splits (that ContinuousFileMonitoringFunction provides) to actual strings? I’m not clear how ContinuousFileReaderOperator can be used.

 

              DataStreamSource<TimestampedFileInputSplit> split = env.addSource(

                           new ContinuousFileMonitoringFunction<String>(

                                         new TextInputFormat(new Path(inputPath)),

                                         inputProcessingMode,

                                         1,

                                         1000)

              );

 

Thanks,
Alex

Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing when reading from files?

Amit Jain
Hi Alex,

StreamingExecutionEnvironment#readFile is a helper function to create
file reader data streaming source. It uses
ContinuousFileReaderOperator and ContinuousFileMonitoringFunction
internally.

As both file reader operator and monitoring function uses
checkpointing so is readFile [1], you can go with first approach.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation-


--
Thanks,
Amit


On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI <[hidden email]> wrote:

> I want to add checkpointing to my program that reads from a set of files in
> a directory. Without checkpointing I use readFile():
>
>
>
>               DataStream<String> text = env.readFile(
>
>                            new TextInputFormat(new Path(inputPath)),
>
>                            inputPath,
>
>                           inputProcessingMode,
>
>                           1000);
>
>
>
> Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator
> to add checkpointing? Or is there an easier way?
>
>
>
> How do I go from splits (that ContinuousFileMonitoringFunction provides) to
> actual strings? I’m not clear how ContinuousFileReaderOperator can be used.
>
>
>
>               DataStreamSource<TimestampedFileInputSplit> split =
> env.addSource(
>
>                            new ContinuousFileMonitoringFunction<String>(
>
>                                          new TextInputFormat(new
> Path(inputPath)),
>
>                                          inputProcessingMode,
>
>                                          1,
>
>                                          1000)
>
>               );
>
>
>
> Thanks,
> Alex
Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing when reading from files?

Padarn Wilson-2
I'm a bit confused about this too actually. I think the above would work as a solution if you want to continuously monitor a directory, but for a "PROCESS_ONCE" readFile source I don't think you will get a checkpoint emitted indicating the end of the stream.

My understanding of this is that there can be no checkpoints created while the file directory 
Trying to dig into the java code I found this:

case PROCESS_ONCE:
synchronized (checkpointLock) {

// the following check guarantees that if we restart
// after a failure and we managed to have a successful
// checkpoint, we will not reprocess the directory.

if (globalModificationTime == Long.MIN_VALUE) {
monitorDirAndForwardSplits(fileSystem, context);
globalModificationTime = Long.MAX_VALUE;
}
isRunning = false;
}
break;
My understanding of this is that there can be no checkpoints created while the file directory is read, and then once it is read the isRunning flat is set to false, which means no new checkpoints are emitted.
Is this correct? If so, is it possible to somehow force a checkpoint to be emitted on the completion of the source?


On Tue, May 22, 2018 at 3:24 AM Amit Jain <[hidden email]> wrote:
Hi Alex,

StreamingExecutionEnvironment#readFile is a helper function to create
file reader data streaming source. It uses
ContinuousFileReaderOperator and ContinuousFileMonitoringFunction
internally.

As both file reader operator and monitoring function uses
checkpointing so is readFile [1], you can go with first approach.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation-


--
Thanks,
Amit


On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI <[hidden email]> wrote:
> I want to add checkpointing to my program that reads from a set of files in
> a directory. Without checkpointing I use readFile():
>
>
>
>               DataStream<String> text = env.readFile(
>
>                            new TextInputFormat(new Path(inputPath)),
>
>                            inputPath,
>
>                           inputProcessingMode,
>
>                           1000);
>
>
>
> Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator
> to add checkpointing? Or is there an easier way?
>
>
>
> How do I go from splits (that ContinuousFileMonitoringFunction provides) to
> actual strings? I’m not clear how ContinuousFileReaderOperator can be used.
>
>
>
>               DataStreamSource<TimestampedFileInputSplit> split =
> env.addSource(
>
>                            new ContinuousFileMonitoringFunction<String>(
>
>                                          new TextInputFormat(new
> Path(inputPath)),
>
>                                          inputProcessingMode,
>
>                                          1,
>
>                                          1000)
>
>               );
>
>
>
> Thanks,
> Alex
Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing when reading from files?

Fabian Hueske-2
Hi,

The continuous file source is split into two components. 1) A split generator that monitors a directory and generates splits when a new file is observed, and 2) reading tasks that receive splits and read the referenced files.

I think this is the code that generates input splits which are distributed to reading tasks.
In PROCESS_ONCE mode, all files are listed once, splits are generated and forwarded to the reading tasks.
At that point, the split generator can stop because it did all the work. The reading tasks received all splits and maintain them in state.
When the job needs to recover, the split generator won't re-emit splits because they are already checkpointed by the reader tasks.

@Padarn: There is no way to force a checkpoint from within an application. Checkpoints are triggered by the JobManager.

Best, Fabian

2018-05-27 14:52 GMT+02:00 Padarn Wilson <[hidden email]>:
I'm a bit confused about this too actually. I think the above would work as a solution if you want to continuously monitor a directory, but for a "PROCESS_ONCE" readFile source I don't think you will get a checkpoint emitted indicating the end of the stream.

My understanding of this is that there can be no checkpoints created while the file directory 
Trying to dig into the java code I found this:

case PROCESS_ONCE:
synchronized (checkpointLock) {

// the following check guarantees that if we restart
// after a failure and we managed to have a successful
// checkpoint, we will not reprocess the directory.

if (globalModificationTime == Long.MIN_VALUE) {
monitorDirAndForwardSplits(fileSystem, context);
globalModificationTime = Long.MAX_VALUE;
}
isRunning = false;
}
break;
My understanding of this is that there can be no checkpoints created while the file directory is read, and then once it is read the isRunning flat is set to false, which means no new checkpoints are emitted.
Is this correct? If so, is it possible to somehow force a checkpoint to be emitted on the completion of the source?


On Tue, May 22, 2018 at 3:24 AM Amit Jain <[hidden email]> wrote:
Hi Alex,

StreamingExecutionEnvironment#readFile is a helper function to create
file reader data streaming source. It uses
ContinuousFileReaderOperator and ContinuousFileMonitoringFunction
internally.

As both file reader operator and monitoring function uses
checkpointing so is readFile [1], you can go with first approach.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation-


--
Thanks,
Amit


On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI <[hidden email]> wrote:
> I want to add checkpointing to my program that reads from a set of files in
> a directory. Without checkpointing I use readFile():
>
>
>
>               DataStream<String> text = env.readFile(
>
>                            new TextInputFormat(new Path(inputPath)),
>
>                            inputPath,
>
>                           inputProcessingMode,
>
>                           1000);
>
>
>
> Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator
> to add checkpointing? Or is there an easier way?
>
>
>
> How do I go from splits (that ContinuousFileMonitoringFunction provides) to
> actual strings? I’m not clear how ContinuousFileReaderOperator can be used.
>
>
>
>               DataStreamSource<TimestampedFileInputSplit> split =
> env.addSource(
>
>                            new ContinuousFileMonitoringFunction<String>(
>
>                                          new TextInputFormat(new
> Path(inputPath)),
>
>                                          inputProcessingMode,
>
>                                          1,
>
>                                          1000)
>
>               );
>
>
>
> Thanks,
> Alex