AvroParquetWriter may cause task managers to get lost

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

AvroParquetWriter may cause task managers to get lost

Ivan Budincevic

Hi all,

 

We recently implemented a feature in our streaming flink job in which we have a AvroParquetWriter which we build every time the overridden “write” method from org.apache.flink.streaming.connectors.fs.Writer gets called. We had to do this because the schema of each record is potentially different and we have to get the schema for the AvroParquetWriter out of the record itself first. Previously this builder was built only one time in the “open” method and from then only the write method was called per record.

 

Since implementing this our job crashes with “Connection unexpectedly closed by remote task manager ‘internal company url’. This might indicate that the remote task manager was lost.”

 

We did not run into any issues on our test environments, so we are suspecting this problem occurs only on higher loads as we have on our production environment. Unfortunately we still don’t have a proper means of reproducing this much load on our test environment to debug.

 

Would having the AvroParquetWriter being built on every write be causing the problem and if so why would that be the case?

 

Any help in getting to the bottom of the issue would be really appreciated. Bellow there is a code snippet of the class which uses the AvroParquetWriter.

 

Best regards,

Ivan Budincevic

Software engineer, bol.com

Netherlands

 

package com.bol.measure.timeblocks.files;

import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

public class SlottedMeasurementsWriter implements Writer<SlottedMeasurements> {
  private transient ParquetWriter<GenericRecord> parquetWriter;
  private boolean overwrite;
  private Path path;

  public SlottedMeasurementsWriter(boolean overwrite) {
    this.overwrite = overwrite;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
    this.path = path;
  }

  @Override
  public long flush() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public long getPos() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public void close() throws IOException {
    parquetWriter.close();
  }

  @Override
  public void write(SlottedMeasurements slot) throws IOException {

    final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
      AvroParquetWriter
        .<GenericRecord>builder(path)
        .withSchema(slot.getMeasurements().get(0).getSchema())
        .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
        .withDictionaryEncoding(true)
        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
    if (overwrite) {
      writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
    }

    parquetWriter = writerBuilder.build();

    for (GenericRecord measurement : slot.getMeasurements()) {
      parquetWriter.write(measurement);
    }
  }


  @Override
  public Writer<SlottedMeasurements> duplicate() {
    return new SlottedMeasurementsWriter(this.overwrite);
  }
}

 

 

Reply | Threaded
Open this post in threaded view
|

Re: AvroParquetWriter may cause task managers to get lost

Fabian Hueske-2
Hi Ivan,

I don't have much experience with Avro, but extracting the schema and creating a writer for each record sounds like a pretty expensive approach.
This might result in significant load and increased GC activity.

Do all records have a different schema or might it make sense to cache the writers in a weak hashmap?

Best, Fabian


2017-11-07 19:51 GMT+01:00 Ivan Budincevic <[hidden email]>:

Hi all,

 

We recently implemented a feature in our streaming flink job in which we have a AvroParquetWriter which we build every time the overridden “write” method from org.apache.flink.streaming.connectors.fs.Writer gets called. We had to do this because the schema of each record is potentially different and we have to get the schema for the AvroParquetWriter out of the record itself first. Previously this builder was built only one time in the “open” method and from then only the write method was called per record.

 

Since implementing this our job crashes with “Connection unexpectedly closed by remote task manager ‘internal company url’. This might indicate that the remote task manager was lost.”

 

We did not run into any issues on our test environments, so we are suspecting this problem occurs only on higher loads as we have on our production environment. Unfortunately we still don’t have a proper means of reproducing this much load on our test environment to debug.

 

Would having the AvroParquetWriter being built on every write be causing the problem and if so why would that be the case?

 

Any help in getting to the bottom of the issue would be really appreciated. Bellow there is a code snippet of the class which uses the AvroParquetWriter.

 

Best regards,

Ivan Budincevic

Software engineer, bol.com

Netherlands

 

package com.bol.measure.timeblocks.files;

import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

public class SlottedMeasurementsWriter implements Writer<SlottedMeasurements> {
  private transient ParquetWriter<GenericRecord> parquetWriter;
  private boolean overwrite;
  private Path path;

  public SlottedMeasurementsWriter(boolean overwrite) {
    this.overwrite = overwrite;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
    this.path = path;
  }

  @Override
  public long flush() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public long getPos() throws IOException {
    return parquetWriter.getDataSize();
  }

  @Override
  public void close() throws IOException {
    parquetWriter.close();
  }

  @Override
  public void write(SlottedMeasurements slot) throws IOException {

    final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
      AvroParquetWriter
        .<GenericRecord>builder(path)
        .withSchema(slot.getMeasurements().get(0).getSchema())
        .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
        .withDictionaryEncoding(true)
        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
    if (overwrite) {
      writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
    }

    parquetWriter = writerBuilder.build();

    for (GenericRecord measurement : slot.getMeasurements()) {
      parquetWriter.write(measurement);
    }
  }


  @Override
  public Writer<SlottedMeasurements> duplicate() {
    return new SlottedMeasurementsWriter(this.overwrite);
  }
}

 

 


Reply | Threaded
Open this post in threaded view
|

Re: AvroParquetWriter may cause task managers to get lost

Nico Kruber
In reply to this post by Ivan Budincevic
Hi Ivan,
sure, the more work you do per record, the slower the sink will be. However,
this should not influence (much) the liveness checks inside flink.
Do you get some meaningful entries in the TaskManagers' logs indicating the
problem?

I'm no expert on Avro and don't know how much actual work it is to create such
a writer, but from the code you gave:
- wouldn't your getPos() circumvent the BucketingSink's rolling file property?
- similarly for flush() which may be dangerous during recovery (judging from
its documentation - "returns the offset that the file must be truncated to at
recovery")?


Nico

On Tuesday, 7 November 2017 19:51:35 CET Ivan Budincevic wrote:
> Hi all,
>
> We recently implemented a feature in our streaming flink job in which we
> have a AvroParquetWriter which we build every time the overridden “write”
> method from org.apache.flink.streaming.connectors.fs.Writer gets called. We
> had to do this because the schema of each record is potentially different
> and we have to get the schema for the AvroParquetWriter out of the record
> itself first. Previously this builder was built only one time in the “open”
> method and from then only the write method was called per record.
 
> Since implementing this our job crashes with “Connection unexpectedly closed
> by remote task manager ‘internal company url’. This might indicate that the
> remote task manager was lost.”
 
> We did not run into any issues on our test environments, so we are
> suspecting this problem occurs only on higher loads as we have on our
> production environment. Unfortunately we still don’t have a proper means of
> reproducing this much load on our test environment to debug.
 
> Would having the AvroParquetWriter being built on every write be causing the
> problem and if so why would that be the case?
 
> Any help in getting to the bottom of the issue would be really appreciated.
> Bellow there is a code snippet of the class which uses the
> AvroParquetWriter.
 

> Best regards,
> Ivan Budincevic
> Software engineer, bol.com
> Netherlands
>
> package com.bol.measure.timeblocks.files;
>
> import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.streaming.connectors.fs.Writer;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.parquet.avro.AvroParquetWriter;
> import org.apache.parquet.column.ParquetProperties;
> import org.apache.parquet.hadoop.ParquetFileWriter;
> import org.apache.parquet.hadoop.ParquetWriter;
> import org.apache.parquet.hadoop.metadata.CompressionCodecName;
>
> import java.io.IOException;
>
> public class SlottedMeasurementsWriter implements
> Writer<SlottedMeasurements> {
 private transient

> ParquetWriter<GenericRecord> parquetWriter;
>   private boolean overwrite;
>   private Path path;
>
>   public SlottedMeasurementsWriter(boolean overwrite) {
>     this.overwrite = overwrite;
>   }
>
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
>     this.path = path;
>   }
>
>   @Override
>   public long flush() throws IOException {
>     return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public long getPos() throws IOException {
>     return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public void close() throws IOException {
>     parquetWriter.close();
>   }
>
>   @Override
>   public void write(SlottedMeasurements slot) throws IOException {
>
>     final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
>       AvroParquetWriter
>         .<GenericRecord>builder(path)
>         .withSchema(slot.getMeasurements().get(0).getSchema())
>         .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
>         .withDictionaryEncoding(true)
>         .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
>     if (overwrite) {
>       writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
>     }
>
>     parquetWriter = writerBuilder.build();
>
>     for (GenericRecord measurement : slot.getMeasurements()) {
>       parquetWriter.write(measurement);
>     }
>   }
>
>
>   @Override
>   public Writer<SlottedMeasurements> duplicate() {
>     return new SlottedMeasurementsWriter(this.overwrite);
>   }
> }
>
>


signature.asc (201 bytes) Download Attachment