Weird error in submitting a flink job to yarn cluster

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Weird error in submitting a flink job to yarn cluster

vipul singh
Hello,

I am working on a ParquetSink writer, which will convert a kafka stream to parquet format. I am having some weird issues in deploying this application to a yarn cluster. I am not 100% sure this falls into a flink related error, but I wanted to reach out to folks here incase it might be.


If I launch Flink within YARN only for executing a single job, it runs ok. This is the command I use for the deployment:

Command: flink run  --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2 -yn 2 -d -c <class_name> jar_name.jar 

However as soon as I try to submit a similar job to a already running yarn cluster, I start to get these errors(https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57) and application crashes. I checked the location in /tmp, where I am creating the file, and there is no file existing there. 

Command: flink run -yid application_id -d -c <class_name> jar_name.jar 


A bit more about my algorithm, I use a temp array to buffer messages in the @invoke method, and when specific threshold are reached I create a parquet file with this buffered data. Once a tmp parquet file is created, I upload this file to long term storage.

The code to write buffered data to a parquet file is:

 writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
.withSchema(schema.get)
.withCompressionCodec(compressionCodecName)
.withRowGroupSize(blockSize)
.withPageSize(pageSize)
.build())
bufferedMessages.foreach { e =>
writer.get.write(e.payload)
}
writer.get.close()

Please do let me know.

Thanking in advance,
- Vipul


Reply | Threaded
Open this post in threaded view
|

Re: Weird error in submitting a flink job to yarn cluster

Chesnay Schepler
This isn't related to FLink but i might be able to help you out anyway.

Does the ParquestFileWriter set the 'overwrite' flag when calling 'FileSystem#create()'?

My suspicion is that you create a file for the first batch, write it out, but not delete it.
For the next batch, the file cannot be created (since it still exists) and thus fails.

Since the application now crashes the /tmp directory probably gets cleaned up, which is why you don't see
any leftover file.

To verify this theory you can add a simple counter to your sink for the number of created files. It should succeed
for the first batch and fail on the second one. In this case you should make sure that the file is deleted after the first
batch has been written.

On 03.10.2017 08:01, vipul singh wrote:
Hello,

I am working on a ParquetSink writer, which will convert a kafka stream to parquet format. I am having some weird issues in deploying this application to a yarn cluster. I am not 100% sure this falls into a flink related error, but I wanted to reach out to folks here incase it might be.


If I launch Flink within YARN only for executing a single job, it runs ok. This is the command I use for the deployment:

Command: flink run  --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2 -yn 2 -d -c <class_name> jar_name.jar 

However as soon as I try to submit a similar job to a already running yarn cluster, I start to get these errors(https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57) and application crashes. I checked the location in /tmp, where I am creating the file, and there is no file existing there. 

Command: flink run -yid application_id -d -c <class_name> jar_name.jar 


A bit more about my algorithm, I use a temp array to buffer messages in the @invoke method, and when specific threshold are reached I create a parquet file with this buffered data. Once a tmp parquet file is created, I upload this file to long term storage.

The code to write buffered data to a parquet file is:

 writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
   .withSchema(schema.get)
   .withCompressionCodec(compressionCodecName)
   .withRowGroupSize(blockSize)
   .withPageSize(pageSize)
   .build())
bufferedMessages.foreach { e =>
  writer.get.write(e.payload)
}
writer.get.close()

Please do let me know.

Thanking in advance,
- Vipul