Writing the results of the stream onto a CSV File

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing the results of the stream onto a CSV File

Abdul Salam Shaikh
Hi,

I am trying to write the results of my stream into a CSV format using the following code and it has compilation issues: 

DataStream<DetectorStatistics> objectStream = windowedStream.flatMap(new WindowObjectStreamTransformer());
        objectStream.writeAsText("H:\\data.csv", new TextFormatter<DetectorStatistics>() {
            public String format (DetectorStatistics value) {
                return value.getDetectorId() + " ," + value.getActualPhaseTime() ;
            } 
        });

​What am I doing wrong here ? Or is there an alternative way to write records onto a CSV file ? ​

​Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Writing the results of the stream onto a CSV File

Fábio Dias
Hi,

Instead of use writeAsText you have a writeAsCsv https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html

You can use just with the string path (like you have) or you can use the overwrite flag if it suit your needs.

Best Regards,
Fábio Dias.

Abdul Salam Shaikh <[hidden email]> escreveu no dia sexta, 28/04/2017 às 10:18:
Hi,

I am trying to write the results of my stream into a CSV format using the following code and it has compilation issues: 

DataStream<DetectorStatistics> objectStream = windowedStream.flatMap(new WindowObjectStreamTransformer());
        objectStream.writeAsText("H:\\data.csv", new TextFormatter<DetectorStatistics>() {
            public String format (DetectorStatistics value) {
                return value.getDetectorId() + " ," + value.getActualPhaseTime() ;
            } 
        });

​What am I doing wrong here ? Or is there an alternative way to write records onto a CSV file ? ​

​Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: Writing the results of the stream onto a CSV File

Till Rohrmann

Hi Abdul,

the DataStream#writeAsText does not support a TextFormatter as argument. You either have to implement your own OutputFormat and calling DataStream#writeUsingOutputFormat or as Fabio recommended simply use DataStream#writeAsCsv.

Cheers,
Till


On Fri, Apr 28, 2017 at 11:46 AM, Fábio Dias <[hidden email]> wrote:
Hi,

Instead of use writeAsText you have a writeAsCsv https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html

You can use just with the string path (like you have) or you can use the overwrite flag if it suit your needs.

Best Regards,
Fábio Dias.

Abdul Salam Shaikh <[hidden email]> escreveu no dia sexta, 28/04/2017 às 10:18:
Hi,

I am trying to write the results of my stream into a CSV format using the following code and it has compilation issues: 

DataStream<DetectorStatistics> objectStream = windowedStream.flatMap(new WindowObjectStreamTransformer());
        objectStream.writeAsText("H:\\data.csv", new TextFormatter<DetectorStatistics>() {
            public String format (DetectorStatistics value) {
                return value.getDetectorId() + " ," + value.getActualPhaseTime() ;
            } 
        });

​What am I doing wrong here ? Or is there an alternative way to write records onto a CSV file ? ​

​Thanks!