Hello all,
I am able to write the Wikipedia edit data to the kafka and as a text file as per the given example of WikipediaAnalysis. But when I try to write it as csv, the blank files initially created never gets filled with data. Below is the code: DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { acc.f0 = event.getUser(); acc.f1 += event.getByteDiff(); return acc; } }); result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE); -------------------------> works result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result", FileSystem.WriteMode.OVERWRITE); --------------------------> doesn't work Why is data getting written to file as text but not as csv? Best Regards, Subash Basnet |
Hello there, If anyone could help me know why the below result DataStream get's written as text, but not as csv?. As it's in a tuple format I guess it should be the same for both text and csv. It shows no error just simply doesn't write to file when result is written as csv. DataStream<Tuple2<String, Long>> result = keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { acc.f0 = event.getUser(); acc.f1 += event.getByteDiff(); return acc; } }); result.writeAsText(.....); ----------------------------------------------------------------------> It is working. result.writeAsCsv(.....); -----------------------------------------------------------------------> It is not working. Best Regards, Subash Basnet On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <[hidden email]> wrote:
|
Have you checked the log files as well? 2016-05-01 14:07 GMT+02:00 subash basnet <[hidden email]>:
|
I think there is a problem with the interaction of legacy OutputFormats and streaming programs. Flush is not called, the CsvOutputFormat only writes in flush(), therefore we don't see any results. On Mon, 2 May 2016 at 11:59 Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |