|
Hello guys,
thank you for all the efforts around flink and the support you are providing. I have a simple question (from someone new with flink): I'm wrote a simple code to continuously read csv file in a folder that is monitored by flink:
public class TextFromDirStream {
//
// Program
//
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// monitor directory, checking for new files
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());
inputStream.print();
// execute program
env.execute();
}
}
Now I want to upgrade this code and read these files as CSV and put the result as Tuple (simple tuple2<String, Integer>). I'm a bit lost in the documentation and I don't know if I need to change the "format" object or change the function readFile, change both ... What is next for me ?
BR,
Belisair
|