DataStream csv reading

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStream csv reading

drystan mazur
Hello I am reading a csv file with flink 1.1.2 the file loads and runs but printing shows nothing ?
<code>
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        System.out.println(env);

        DataStream<Tuple9<String,String,String,
                String,String,String,
                String,String,String>> oilDataStream;
        DataStreamSink oildataSink;
        //String path = Paths.get(".").toAbsolutePath().normalize().toString();
        String path = "/quickstart/test_data/oil_price.csv";


        Path oilPath = new Path(path);


        TupleCsvInputFormat oilDataIn;

        TupleTypeInfo<Tuple9<String,String,String,
                        String,String,String,
                        String,String,String>> oildataTypes;

        BasicTypeInfo[] types = {BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO};

        oildataTypes = new TupleTypeInfo<>(types);
        oilDataIn = new TupleCsvInputFormat<>(oilPath,"\n",",",oildataTypes);


        oilDataStream = env.createInput(oilDataIn,new TupleTypeInfo(Tuple9.class, types));
        oilDataStream.print();




		env.execute("Flink Java API Skeleton");


</code>
The code runs ok I just wanted to view the datastream what I am doing wrong ? Thanks
Reply | Threaded
Open this post in threaded view
|

Re: DataStream csv reading

Greg Hogan
The program executes when you call print (same for collect), which is why you are seeing an error when calling execute (since there is no new job to execute). As Fabian noted, you'll need to look in the TaskManager log files for the printed output if running on a cluster.

On Thu, Oct 6, 2016 at 4:21 PM, drystan mazur <[hidden email]> wrote:
Hello I am reading a csv file with flink 1.1.2 the file loads and runs but printing shows nothing ?
<code>
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        System.out.println(env);

        DataStream<Tuple9<String,String,String,
                String,String,String,
                String,String,String>> oilDataStream;
        DataStreamSink oildataSink;
        //String path = Paths.get(".").toAbsolutePath().normalize().toString();
        String path = "/quickstart/test_data/oil_price.csv";


        Path oilPath = new Path(path);


        TupleCsvInputFormat oilDataIn;

        TupleTypeInfo<Tuple9<String,String,String,
                        String,String,String,
                        String,String,String>> oildataTypes;

        BasicTypeInfo[] types = {BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO};

        oildataTypes = new TupleTypeInfo<>(types);
        oilDataIn = new TupleCsvInputFormat<>(oilPath,"\n",",",oildataTypes);


        oilDataStream = env.createInput(oilDataIn,new TupleTypeInfo(Tuple9.class, types));
        oilDataStream.print();




		env.execute("Flink Java API Skeleton");


</code>
The code runs ok I just wanted to view the datastream what I am doing wrong ? Thanks

View this message in context: DataStream csv reading
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: DataStream csv reading

Fabian Hueske-2
Hi Greg,

print is only eagerly executed for DataSet programs.
In the DataStream API, print() just appends a print sink and execute() is required to trigger an execution.


2016-10-06 22:40 GMT+02:00 Greg Hogan <[hidden email]>:
The program executes when you call print (same for collect), which is why you are seeing an error when calling execute (since there is no new job to execute). As Fabian noted, you'll need to look in the TaskManager log files for the printed output if running on a cluster.

On Thu, Oct 6, 2016 at 4:21 PM, drystan mazur <[hidden email]> wrote:
Hello I am reading a csv file with flink 1.1.2 the file loads and runs but printing shows nothing ?
<code>
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        System.out.println(env);

        DataStream<Tuple9<String,String,String,
                String,String,String,
                String,String,String>> oilDataStream;
        DataStreamSink oildataSink;
        //String path = Paths.get(".").toAbsolutePath().normalize().toString();
        String path = "/quickstart/test_data/oil_price.csv";


        Path oilPath = new Path(path);


        TupleCsvInputFormat oilDataIn;

        TupleTypeInfo<Tuple9<String,String,String,
                        String,String,String,
                        String,String,String>> oildataTypes;

        BasicTypeInfo[] types = {BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO};

        oildataTypes = new TupleTypeInfo<>(types);
        oilDataIn = new TupleCsvInputFormat<>(oilPath,"\n",",",oildataTypes);


        oilDataStream = env.createInput(oilDataIn,new TupleTypeInfo(Tuple9.class, types));
        oilDataStream.print();




		env.execute("Flink Java API Skeleton");


</code>
The code runs ok I just wanted to view the datastream what I am doing wrong ? Thanks

View this message in context: DataStream csv reading
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: DataStream csv reading

drystan mazur
In reply to this post by Greg Hogan
Hi Greg, The program runs with no exceptions in an IDE but I can't see the datastream print messages
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-730543706]
10/06/2016 14:47:13	Job execution switched to status RUNNING.
10/06/2016 14:47:13	Source: Custom File source(1/1) switched to SCHEDULED 
10/06/2016 14:47:13	Source: Custom File source(1/1) switched to DEPLOYING 
10/06/2016 14:47:13	FileSplitReader_Custom File source -> Sink: Unnamed(1/1) switched to SCHEDULED 
10/06/2016 14:47:13	FileSplitReader_Custom File source -> Sink: Unnamed(1/1) switched to DEPLOYING 
10/06/2016 14:47:13	FileSplitReader_Custom File source -> Sink: Unnamed(1/1) switched to RUNNING 
10/06/2016 14:47:13	Source: Custom File source(1/1) switched to RUNNING 
10/06/2016 14:47:14	Source: Custom File source(1/1) switched to FINISHED 
10/06/2016 14:47:14	FileSplitReader_Custom File source -> Sink: Unnamed(1/1) switched to FINISHED 
10/06/2016 14:47:14	Job execution switched to status FINISHED.
Thanks