Unable to write stream as csv

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to write stream as csv

subashbasnet
Hello all,

I am able to write the Wikipedia edit data to the kafka and as a text file as per the given example of WikipediaAnalysis. But when I try to write it as csv, the blank files initially created never gets filled with data. Below is the code:

DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
-------------------------> works
result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
 --------------------------> doesn't work

Why is data getting written to file as text but not as csv? 

Best Regards,
Subash Basnet

Reply | Threaded
Open this post in threaded view
|

Re: Unable to write stream as csv

subashbasnet
Hello there,

If anyone could help me know why the below result DataStream get's written as text, but not as csv?. As it's in a tuple format I guess it should be the same for both text and csv. It shows no error just simply doesn't write to file when result is written as csv. 

DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.writeAsText(.....); ----------------------------------------------------------------------> It is working. 
result.writeAsCsv(.....); -----------------------------------------------------------------------> It is not working. 

Best Regards,
Subash Basnet

On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <[hidden email]> wrote:
Hello all,

I am able to write the Wikipedia edit data to the kafka and as a text file as per the given example of WikipediaAnalysis. But when I try to write it as csv, the blank files initially created never gets filled with data. Below is the code:

DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
-------------------------> works
result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
 --------------------------> doesn't work

Why is data getting written to file as text but not as csv? 

Best Regards,
Subash Basnet


Reply | Threaded
Open this post in threaded view
|

Re: Unable to write stream as csv

Fabian Hueske-2
Have you checked the log files as well?

2016-05-01 14:07 GMT+02:00 subash basnet <[hidden email]>:
Hello there,

If anyone could help me know why the below result DataStream get's written as text, but not as csv?. As it's in a tuple format I guess it should be the same for both text and csv. It shows no error just simply doesn't write to file when result is written as csv. 

DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.writeAsText(.....); ----------------------------------------------------------------------> It is working. 
result.writeAsCsv(.....); -----------------------------------------------------------------------> It is not working. 

Best Regards,
Subash Basnet

On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <[hidden email]> wrote:
Hello all,

I am able to write the Wikipedia edit data to the kafka and as a text file as per the given example of WikipediaAnalysis. But when I try to write it as csv, the blank files initially created never gets filled with data. Below is the code:

DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
-------------------------> works
result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
 --------------------------> doesn't work

Why is data getting written to file as text but not as csv? 

Best Regards,
Subash Basnet



Reply | Threaded
Open this post in threaded view
|

Re: Unable to write stream as csv

Aljoscha Krettek
I think there is a problem with the interaction of legacy OutputFormats and streaming programs. Flush is not called, the CsvOutputFormat only writes in flush(), therefore we don't see any results.

On Mon, 2 May 2016 at 11:59 Fabian Hueske <[hidden email]> wrote:
Have you checked the log files as well?

2016-05-01 14:07 GMT+02:00 subash basnet <[hidden email]>:
Hello there,

If anyone could help me know why the below result DataStream get's written as text, but not as csv?. As it's in a tuple format I guess it should be the same for both text and csv. It shows no error just simply doesn't write to file when result is written as csv. 

DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.writeAsText(.....); ----------------------------------------------------------------------> It is working. 
result.writeAsCsv(.....); -----------------------------------------------------------------------> It is not working. 

Best Regards,
Subash Basnet

On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <[hidden email]> wrote:
Hello all,

I am able to write the Wikipedia edit data to the kafka and as a text file as per the given example of WikipediaAnalysis. But when I try to write it as csv, the blank files initially created never gets filled with data. Below is the code:

DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
-------------------------> works
result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE);
 --------------------------> doesn't work

Why is data getting written to file as text but not as csv? 

Best Regards,
Subash Basnet