Hi all, We recently implemented a feature in our streaming flink job in which we have a AvroParquetWriter which we build every time the overridden “write” method from org.apache.flink.streaming.connectors.fs.Writer
gets called. We had to do this because the schema of each record is potentially different and we have to get the schema for the AvroParquetWriter out of the record itself first. Previously this builder was built only one time in the “open” method and from
then only the write method was called per record. Since implementing this our job crashes with “Connection unexpectedly closed by remote task manager ‘internal company url’. This might indicate that the remote task manager was lost.”
We did not run into any issues on our test environments, so we are suspecting this problem occurs only on higher loads as we have on our production environment. Unfortunately we still don’t have a proper means
of reproducing this much load on our test environment to debug. Would having the AvroParquetWriter being built on every write be causing the problem and if so why would that be the case?
Any help in getting to the bottom of the issue would be really appreciated. Bellow there is a code snippet of the class which uses the AvroParquetWriter. Best regards, Ivan Budincevic Software engineer, bol.com Netherlands package com.bol.measure.timeblocks.files; |
Hi Ivan, I don't have much experience with Avro, but extracting the schema and creating a writer for each record sounds like a pretty expensive approach. 2017-11-07 19:51 GMT+01:00 Ivan Budincevic <[hidden email]>:
|
In reply to this post by Ivan Budincevic
Hi Ivan,
sure, the more work you do per record, the slower the sink will be. However, this should not influence (much) the liveness checks inside flink. Do you get some meaningful entries in the TaskManagers' logs indicating the problem? I'm no expert on Avro and don't know how much actual work it is to create such a writer, but from the code you gave: - wouldn't your getPos() circumvent the BucketingSink's rolling file property? - similarly for flush() which may be dangerous during recovery (judging from its documentation - "returns the offset that the file must be truncated to at recovery")? Nico On Tuesday, 7 November 2017 19:51:35 CET Ivan Budincevic wrote: > Hi all, > > We recently implemented a feature in our streaming flink job in which we > have a AvroParquetWriter which we build every time the overridden “write” > method from org.apache.flink.streaming.connectors.fs.Writer gets called. We > had to do this because the schema of each record is potentially different > and we have to get the schema for the AvroParquetWriter out of the record > itself first. Previously this builder was built only one time in the “open” > method and from then only the write method was called per record. > Since implementing this our job crashes with “Connection unexpectedly closed > by remote task manager ‘internal company url’. This might indicate that the > remote task manager was lost.” > We did not run into any issues on our test environments, so we are > suspecting this problem occurs only on higher loads as we have on our > production environment. Unfortunately we still don’t have a proper means of > reproducing this much load on our test environment to debug. > Would having the AvroParquetWriter being built on every write be causing the > problem and if so why would that be the case? > Any help in getting to the bottom of the issue would be really appreciated. > Bellow there is a code snippet of the class which uses the > AvroParquetWriter. > Best regards, > Ivan Budincevic > Software engineer, bol.com > Netherlands > > package com.bol.measure.timeblocks.files; > > import com.bol.measure.timeblocks.measurement.SlottedMeasurements; > import org.apache.avro.generic.GenericRecord; > import org.apache.flink.streaming.connectors.fs.Writer; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.parquet.avro.AvroParquetWriter; > import org.apache.parquet.column.ParquetProperties; > import org.apache.parquet.hadoop.ParquetFileWriter; > import org.apache.parquet.hadoop.ParquetWriter; > import org.apache.parquet.hadoop.metadata.CompressionCodecName; > > import java.io.IOException; > > public class SlottedMeasurementsWriter implements > Writer<SlottedMeasurements> { > ParquetWriter<GenericRecord> parquetWriter; > private boolean overwrite; > private Path path; > > public SlottedMeasurementsWriter(boolean overwrite) { > this.overwrite = overwrite; > } > > @Override > public void open(FileSystem fs, Path path) throws IOException { > this.path = path; > } > > @Override > public long flush() throws IOException { > return parquetWriter.getDataSize(); > } > > @Override > public long getPos() throws IOException { > return parquetWriter.getDataSize(); > } > > @Override > public void close() throws IOException { > parquetWriter.close(); > } > > @Override > public void write(SlottedMeasurements slot) throws IOException { > > final AvroParquetWriter.Builder<GenericRecord> writerBuilder = > AvroParquetWriter > .<GenericRecord>builder(path) > .withSchema(slot.getMeasurements().get(0).getSchema()) > .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) > .withDictionaryEncoding(true) > .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0); > if (overwrite) { > writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE); > } > > parquetWriter = writerBuilder.build(); > > for (GenericRecord measurement : slot.getMeasurements()) { > parquetWriter.write(measurement); > } > } > > > @Override > public Writer<SlottedMeasurements> duplicate() { > return new SlottedMeasurementsWriter(this.overwrite); > } > } > > signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |