Read csv on datastream

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Read csv on datastream

belisair
Hello guys,

thank you for all the efforts around flink and the support you are providing. I have a simple question (from someone new with flink): I'm wrote a simple code to continuously read csv file in a folder that is monitored by flink:

public class TextFromDirStream {

        //
        // Program
        //

        public static void main(String[] args) throws Exception {

                // set up the execution environment
                final StreamExecutionEnvironment env = StreamExecutionEnvironment
                                .getExecutionEnvironment();

                // monitor directory, checking for new files
                TextInputFormat format = new TextInputFormat(
                new org.apache.flink.core.fs.Path("file:///tmp/dir/"));

        DataStream<String> inputStream = env.readFile(
                                format,
                                "file:///tmp/dir/",
                                FileProcessingMode.PROCESS_CONTINUOUSLY,
                                100,
                                FilePathFilter.createDefaultFilter());

                inputStream.print();

                // execute program
                env.execute();
        }
}

Now I want to upgrade this code and read these files as CSV and put the result as Tuple (simple tuple2<String, Integer>). I'm a bit lost in the documentation and I don't know if I need to change the "format" object or change the function readFile, change both ... What is next for me ?

BR,
Belisair