I want to add checkpointing to my program that reads from a set of files in a directory. Without checkpointing I use readFile(): DataStream<String> text = env.readFile( new TextInputFormat(new Path(inputPath)),
inputPath, inputProcessingMode, 1000); Should I use ContinuousFileMonitoringFunction /
ContinuousFileReaderOperator to add checkpointing? Or is there an easier way? How do I go from splits (that ContinuousFileMonitoringFunction provides) to actual strings? I’m not clear how ContinuousFileReaderOperator can be used. DataStreamSource<TimestampedFileInputSplit>
split =
env.addSource(
new ContinuousFileMonitoringFunction<String>(
new TextInputFormat(new
Path(inputPath)),
inputProcessingMode,
1, 1000) ); Thanks, |
Hi Alex,
StreamingExecutionEnvironment#readFile is a helper function to create file reader data streaming source. It uses ContinuousFileReaderOperator and ContinuousFileMonitoringFunction internally. As both file reader operator and monitoring function uses checkpointing so is readFile [1], you can go with first approach. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation- -- Thanks, Amit On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI <[hidden email]> wrote: > I want to add checkpointing to my program that reads from a set of files in > a directory. Without checkpointing I use readFile(): > > > > DataStream<String> text = env.readFile( > > new TextInputFormat(new Path(inputPath)), > > inputPath, > > inputProcessingMode, > > 1000); > > > > Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator > to add checkpointing? Or is there an easier way? > > > > How do I go from splits (that ContinuousFileMonitoringFunction provides) to > actual strings? I’m not clear how ContinuousFileReaderOperator can be used. > > > > DataStreamSource<TimestampedFileInputSplit> split = > env.addSource( > > new ContinuousFileMonitoringFunction<String>( > > new TextInputFormat(new > Path(inputPath)), > > inputProcessingMode, > > 1, > > 1000) > > ); > > > > Thanks, > Alex |
I'm a bit confused about this too actually. I think the above would work as a solution if you want to continuously monitor a directory, but for a "PROCESS_ONCE" readFile source I don't think you will get a checkpoint emitted indicating the end of the stream. My understanding of this is that there can be no checkpoints created while the file directory Trying to dig into the java code I found this: case PROCESS_ONCE:
On Tue, May 22, 2018 at 3:24 AM Amit Jain <[hidden email]> wrote: Hi Alex, |
Hi, The continuous file source is split into two components. 1) A split generator that monitors a directory and generates splits when a new file is observed, and 2) reading tasks that receive splits and read the referenced files. 2018-05-27 14:52 GMT+02:00 Padarn Wilson <[hidden email]>:
|
Free forum by Nabble | Edit this page |