Hello,
I am working on a ParquetSink writer, which will convert a kafka stream to parquet format. I am having some weird issues in deploying this application to a yarn cluster. I am not 100% sure this falls into a flink related error, but I wanted to reach out to folks here incase it might be. Command: flink run --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2 -yn 2 -d -c <class_name> jar_name.jar However as soon as I try to submit a similar job to a already running yarn cluster, I start to get these errors(https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57) and application crashes. I checked the location in /tmp, where I am creating the file, and there is no file existing there. Command: flink run -yid application_id -d -c <class_name> jar_name.jar A bit more about my algorithm, I use a temp array to buffer messages in the @invoke method, and when specific threshold are reached I create a parquet file with this buffered data. Once a tmp parquet file is created, I upload this file to long term storage. The code to write buffered data to a parquet file is: writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get)) Thanking in advance, - Vipul |
This isn't related to FLink but i might
be able to help you out anyway.
Does the ParquestFileWriter set the 'overwrite' flag when calling 'FileSystem#create()'? My suspicion is that you create a file for the first batch, write it out, but not delete it. For the next batch, the file cannot be created (since it still exists) and thus fails. Since the application now crashes the /tmp directory probably gets cleaned up, which is why you don't see any leftover file. To verify this theory you can add a simple counter to your sink for the number of created files. It should succeed for the first batch and fail on the second one. In this case you should make sure that the file is deleted after the first batch has been written. On 03.10.2017 08:01, vipul singh wrote:
|
Free forum by Nabble | Edit this page |