I'm building a streaming app that continuously monitors a directory for new
files and I'm confused about why I have to specify a TextInputFormat - see
source code below. It seems redundant but it is a required parameter. It
makes perfect sense to specify the directory I want to monitor, but what
purpose is the TextInputFormat filling and what should I set it to? Example:
Simple Word Count App that reads lines of text.
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100);
Also, this call is flagged as deprecated. It looks like it does not like the FilePathFilter.
How would I specify a filter to pick up only certain files, e.g. files that start with "Log"?
I'm using Flink 1.5
Thanks!
DataStream<String> inputStream =
env.readFile(format,
"file:///home/hadoop/code/Flink1/etc/run/stream/file/monitordir/",
processingMode,
1000,
FilePathFilter.createDefaultFilter());
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/