Datastream - writeAsCsv creates empty File

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Datastream - writeAsCsv creates empty File

nico-2
Hi,

I am running my Flink job in the local IDE and want to write the results in a csv file using:

stream.writeAsCsv("...", FileSystem.WriteMode.OVERWRITE).setParallelism(1)

While the file is created, it is empty inside. However, writeAsText works. I have checked the CsvOutputFormat and I think that I am not reaching the buffer size. Moreover, flush() is only used in the close function, but I don't know when the function is called. I am reading my data from a kafka source, so it would be an infinite stream? 

Is there a way to flush the data earlier, for example within the writeRecord method?

Best regards,
Nico
Reply | Threaded
Open this post in threaded view
|

Re: Datastream - writeAsCsv creates empty File

Timo Walther
Hi Nico,

writeAsCsv has limited functionality in this case. I recommend to use
the Bucketing File Sink[1] where you can specify a interval and batch
size when to flush.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html#bucketing-file-sink

Timo


Am 27/01/17 um 11:06 schrieb Nico:

> Hi,
>
> I am running my Flink job in the local IDE and want to write the
> results in a csv file using:
>
> stream.writeAsCsv("...", FileSystem.WriteMode.OVERWRITE).setParallelism(1)
>
> While the file is created, it is empty inside. However, writeAsText
> works. I have checked the CsvOutputFormat and I think that I am not
> reaching the buffer size. Moreover, flush() is only used in the close
> function, but I don't know when the function is called. I am reading
> my data from a kafka source, so it would be an infinite stream?
>
> Is there a way to flush the data earlier, for example within the
> writeRecord method?
>
> Best regards,
> Nico


Reply | Threaded
Open this post in threaded view
|

TaskManager randomly dies

Malte Schwarzer-3
Hi all,

when running a Flink batch job, from time to time a TaskManager dies
randomly, which makes the full job failing. All other nodes then throw
the following exception:

Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Connection unexpectedly closed by remote
task manager 'dyingnode' ...

However, there are no error messages in the log of 'dyingnode'.

But in the PID thread dump of 'dyingnode' I found this:

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGBUS (0x7) at pc=0x00003fff701afa4c, pid=1119228,
tid=0x00003ff38a3ff1b0
#
# JRE version: OpenJDK Runtime Environment (8.0_101-b14) (build
1.8.0_101-b14)
# Java VM: OpenJDK 64-Bit Server VM (25.101-b14 mixed mode linux-ppc64 )
# Problematic frame:
# J 433 C2 org.apache.flink.runtime.util.DataOutputSerializer.write(I)V
(40 bytes) @ 0x00003fff701afa4c [0x00003fff701afa00+0x4c]
# ...

What can cause this? And is this Flink related?


Best regards,
Malte
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager randomly dies

rmetzger0
Hi,
which Flink version are you using?

This issue occurred quite freqently in the 1.2.0 RC0 and should be fixed in later RCs.

On Fri, Jan 27, 2017 at 4:13 PM, Malte Schwarzer <[hidden email]> wrote:
Hi all,

when running a Flink batch job, from time to time a TaskManager dies
randomly, which makes the full job failing. All other nodes then throw
the following exception:

Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Connection unexpectedly closed by remote
task manager 'dyingnode' ...

However, there are no error messages in the log of 'dyingnode'.

But in the PID thread dump of 'dyingnode' I found this:

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGBUS (0x7) at pc=0x00003fff701afa4c, pid=1119228,
tid=0x00003ff38a3ff1b0
#
# JRE version: OpenJDK Runtime Environment (8.0_101-b14) (build
1.8.0_101-b14)
# Java VM: OpenJDK 64-Bit Server VM (25.101-b14 mixed mode linux-ppc64 )
# Problematic frame:
# J 433 C2 org.apache.flink.runtime.util.DataOutputSerializer.write(I)V
(40 bytes) @ 0x00003fff701afa4c [0x00003fff701afa00+0x4c]
# ...

What can cause this? And is this Flink related?


Best regards,
Malte