Hi, I am running my Flink job in the local IDE and want to write the results in a csv file using: While the file is created, it is empty inside. However, writeAsText works. I have checked the CsvOutputFormat and I think that I am not reaching the buffer size. Moreover, flush() is only used in the close function, but I don't know when the function is called. I am reading my data from a kafka source, so it would be an infinite stream? Is there a way to flush the data earlier, for example within the writeRecord method? Best regards, Nico |
Hi Nico,
writeAsCsv has limited functionality in this case. I recommend to use the Bucketing File Sink[1] where you can specify a interval and batch size when to flush. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html#bucketing-file-sink Timo Am 27/01/17 um 11:06 schrieb Nico: > Hi, > > I am running my Flink job in the local IDE and want to write the > results in a csv file using: > > stream.writeAsCsv("...", FileSystem.WriteMode.OVERWRITE).setParallelism(1) > > While the file is created, it is empty inside. However, writeAsText > works. I have checked the CsvOutputFormat and I think that I am not > reaching the buffer size. Moreover, flush() is only used in the close > function, but I don't know when the function is called. I am reading > my data from a kafka source, so it would be an infinite stream? > > Is there a way to flush the data earlier, for example within the > writeRecord method? > > Best regards, > Nico |
Hi all,
when running a Flink batch job, from time to time a TaskManager dies randomly, which makes the full job failing. All other nodes then throw the following exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Connection unexpectedly closed by remote task manager 'dyingnode' ... However, there are no error messages in the log of 'dyingnode'. But in the PID thread dump of 'dyingnode' I found this: # A fatal error has been detected by the Java Runtime Environment: # # SIGBUS (0x7) at pc=0x00003fff701afa4c, pid=1119228, tid=0x00003ff38a3ff1b0 # # JRE version: OpenJDK Runtime Environment (8.0_101-b14) (build 1.8.0_101-b14) # Java VM: OpenJDK 64-Bit Server VM (25.101-b14 mixed mode linux-ppc64 ) # Problematic frame: # J 433 C2 org.apache.flink.runtime.util.DataOutputSerializer.write(I)V (40 bytes) @ 0x00003fff701afa4c [0x00003fff701afa00+0x4c] # ... What can cause this? And is this Flink related? Best regards, Malte |
Hi, which Flink version are you using? This issue occurred quite freqently in the 1.2.0 RC0 and should be fixed in later RCs. On Fri, Jan 27, 2017 at 4:13 PM, Malte Schwarzer <[hidden email]> wrote: Hi all, |
Free forum by Nabble | Edit this page |