Serialization performance

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization performance

Newport, Billy

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

Reply | Threaded
Open this post in threaded view
|

Re: Serialization performance

Stephan Ewen
Hi!

Thanks for this writeup, very cool stuff !

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

We should try and make Avro a first class citizen.
  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.
  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.


A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.


For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.


Greetings,
Stephan



On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 


Reply | Threaded
Open this post in threaded view
|

RE: Serialization performance

Newport, Billy

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Serialization performance

Stephan Ewen
Hi!

I can write some more details later, here the short answer:

  - Your serializer would do into something like the AvroSerializer

  - When you instantiate the AvroSerializer in GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the type of the generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"
    (we abuse the Kryo type registration here as an Avro type registration initially, would have to polish that later)

  - The registered types are classes, but since they are Avro types, you should be able to get their schema (for Reflect Data or so)

That way, Flink would internally forward all the registrations for you (similar as it forwards Kryo registrations) and you don't have to manually do that.

Stephan


On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <[hidden email]> wrote:

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Serialization performance

Aljoscha Krettek
Hi Billy,
on the Beam side, you probably have looked into writing your own Coder (the equivalent of a TypeSerializer in Flink). If yes, did that not work out for you? And if yes, why?

Best,
Aljoscha


On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:
Hi!

I can write some more details later, here the short answer:

  - Your serializer would do into something like the AvroSerializer

  - When you instantiate the AvroSerializer in GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the type of the generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"
    (we abuse the Kryo type registration here as an Avro type registration initially, would have to polish that later)

  - The registered types are classes, but since they are Avro types, you should be able to get their schema (for Reflect Data or so)

That way, Flink would internally forward all the registrations for you (similar as it forwards Kryo registrations) and you don't have to manually do that.

Stephan


On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <[hidden email]> wrote:

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Serialization performance

Newport, Billy

Some history,

We’ve done a bunch of Spark work over the last 2 years but on my project we abandoned using Spark around Spark 1.6 (April 2016) due to bugs, extreme memory usage and general efficiency problems so we lost confidence in it completely at that time. We did a straight MR version of the code and it was dramatically faster from a wall clock time, and more efficient with less cores and significantly less memory usage for same work load. I don’t see us moving back to Spark at all for this project unless they pivoted to use stream based RDDs but then isn’t that flink?

 

Flink became attractive after the spark experience. We needed something to code in that’s easier than MR/TEZ but very efficient resource wise also. The philosophy of streaming as primary approach seems to me like a winner over the spark philosophy and Beams value add around first class windowing/single stream batch API and multi runner became attractive at that time also.

 

Beam and the flink runner helped us take  Flink more seriously especially given flinks efficient runtime characteristics and the improved suite of tests that the beam flink runner brought to flink resulting in hopefully better quality. Beam on Spark was never considered because of our experience with Spark as a runtime.

So, I did not start with beam which was the original plan back in December because beam lacked parquet and partitioning support. The lack of parquet support especially made me uneasy to use beam for batch as without parquet I wondered how widely used it would be doing Hadoop batch without parquet support.

 

As a result, I started with flink parquet. Even so we’re writing our own output formats anyway given flink inefficiencies that we see currently. We’re doing our own partitioning support now. I expect to pivot over to beam in late Q2/Q3, it’s pretty close to flink from our point of view operator wise so that shouldn’t be a big pivot. The main reason for a future pivot to beam is primarily single stream/batch APIs versus two with flink. Plus, a standard API that can port/run on different techs is very attractive. Rewriting Spark code in something else is expensive and we want to get a better ROI on our business code, i.e. longer life span and beam seems the best way to go to get that so we can pivot the runner and reuse our code investment as better technologies come along.

 

Billy

 

From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Friday, March 03, 2017 8:38 AM
To: [hidden email]
Subject: Re: Serialization performance

 

Hi Billy,

on the Beam side, you probably have looked into writing your own Coder (the equivalent of a TypeSerializer in Flink). If yes, did that not work out for you? And if yes, why?

 

Best,

Aljoscha

 

 

On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:

Hi!

 

I can write some more details later, here the short answer:

 

  - Your serializer would do into something like the AvroSerializer

 

  - When you instantiate the AvroSerializer in GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the type of the generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"

    (we abuse the Kryo type registration here as an Avro type registration initially, would have to polish that later)

 

  - The registered types are classes, but since they are Avro types, you should be able to get their schema (for Reflect Data or so)

 

That way, Flink would internally forward all the registrations for you (similar as it forwards Kryo registrations) and you don't have to manually do that.

 

Stephan

 

 

On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <[hidden email]> wrote:

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Serialization performance

Stephan Ewen
In reply to this post by Aljoscha Krettek
Hi Billy!

Out of curiosity: Were you able to hack some direct Avro support as I described in the brief writeup, or do you need some more details?

Stephan

On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi Billy,
on the Beam side, you probably have looked into writing your own Coder (the equivalent of a TypeSerializer in Flink). If yes, did that not work out for you? And if yes, why?

Best,
Aljoscha


On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:
Hi!

I can write some more details later, here the short answer:

  - Your serializer would do into something like the AvroSerializer

  - When you instantiate the AvroSerializer in GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the type of the generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"
    (we abuse the Kryo type registration here as an Avro type registration initially, would have to polish that later)

  - The registered types are classes, but since they are Avro types, you should be able to get their schema (for Reflect Data or so)

That way, Flink would internally forward all the registrations for you (similar as it forwards Kryo registrations) and you don't have to manually do that.

Stephan


On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <[hidden email]> wrote:

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

 



Reply | Threaded
Open this post in threaded view
|

RE: Serialization performance

Newport, Billy

I need more details, flink does not appear to be really designed to add in serializers in a ‘nice’ way as far as I can tell, it’s kind of hardcoded for Kryo right now.

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Tuesday, March 07, 2017 6:21 AM
To: [hidden email]
Subject: Re: Serialization performance

 

Hi Billy!

 

Out of curiosity: Were you able to hack some direct Avro support as I described in the brief writeup, or do you need some more details?

 

Stephan

 

On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek <[hidden email]> wrote:

Hi Billy,

on the Beam side, you probably have looked into writing your own Coder (the equivalent of a TypeSerializer in Flink). If yes, did that not work out for you? And if yes, why?

 

Best,

Aljoscha

 

 

On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:

Hi!

 

I can write some more details later, here the short answer:

 

  - Your serializer would do into something like the AvroSerializer

 

  - When you instantiate the AvroSerializer in GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the type of the generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"

    (we abuse the Kryo type registration here as an Avro type registration initially, would have to polish that later)

 

  - The registered types are classes, but since they are Avro types, you should be able to get their schema (for Reflect Data or so)

 

That way, Flink would internally forward all the registrations for you (similar as it forwards Kryo registrations) and you don't have to manually do that.

 

Stephan

 

 

On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <[hidden email]> wrote:

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Serialization performance

Stephan Ewen
I'll try and add more details in a bit.

If you have some suggestions on how to make the serialization stack more extensible, please let us know!


But I think that hook does not work for Avro...


On Tue, Mar 7, 2017 at 1:25 PM, Newport, Billy <[hidden email]> wrote:

I need more details, flink does not appear to be really designed to add in serializers in a ‘nice’ way as far as I can tell, it’s kind of hardcoded for Kryo right now.

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Tuesday, March 07, 2017 6:21 AM
To: [hidden email]
Subject: Re: Serialization performance

 

Hi Billy!

 

Out of curiosity: Were you able to hack some direct Avro support as I described in the brief writeup, or do you need some more details?

 

Stephan

 

On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek <[hidden email]> wrote:

Hi Billy,

on the Beam side, you probably have looked into writing your own Coder (the equivalent of a TypeSerializer in Flink). If yes, did that not work out for you? And if yes, why?

 

Best,

Aljoscha

 

 

On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:

Hi!

 

I can write some more details later, here the short answer:

 

  - Your serializer would do into something like the AvroSerializer

 

  - When you instantiate the AvroSerializer in GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the type of the generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"

    (we abuse the Kryo type registration here as an Avro type registration initially, would have to polish that later)

 

  - The registered types are classes, but since they are Avro types, you should be able to get their schema (for Reflect Data or so)

 

That way, Flink would internally forward all the registrations for you (similar as it forwards Kryo registrations) and you don't have to manually do that.

 

Stephan

 

 

On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <[hidden email]> wrote:

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?

 

 

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Serialization performance

rmetzger0
I've filed a JIRA for improving the support for Avro GenericRecords: https://issues.apache.org/jira/browse/FLINK-6022 

On Tue, Mar 7, 2017 at 7:13 PM, Stephan Ewen <[hidden email]> wrote:
I'll try and add more details in a bit.

If you have some suggestions on how to make the serialization stack more extensible, please let us know!


But I think that hook does not work for Avro...


On Tue, Mar 7, 2017 at 1:25 PM, Newport, Billy <[hidden email]> wrote:

I need more details, flink does not appear to be really designed to add in serializers in a ‘nice’ way as far as I can tell, it’s kind of hardcoded for Kryo right now.

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Tuesday, March 07, 2017 6:21 AM
To: [hidden email]
Subject: Re: Serialization performance

 

Hi Billy!

 

Out of curiosity: Were you able to hack some direct Avro support as I described in the brief writeup, or do you need some more details?

 

Stephan

 

On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek <[hidden email]> wrote:

Hi Billy,

on the Beam side, you probably have looked into writing your own Coder (the equivalent of a TypeSerializer in Flink). If yes, did that not work out for you? And if yes, why?

 

Best,

Aljoscha

 

 

On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:

Hi!

 

I can write some more details later, here the short answer:

 

  - Your serializer would do into something like the AvroSerializer

 

  - When you instantiate the AvroSerializer in GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the type of the generic type, plus all types in "ExecutionConfig.getRegisteredKryoTypes()"

    (we abuse the Kryo type registration here as an Avro type registration initially, would have to polish that later)

 

  - The registered types are classes, but since they are Avro types, you should be able to get their schema (for Reflect Data or so)

 

That way, Flink would internally forward all the registrations for you (similar as it forwards Kryo registrations) and you don't have to manually do that.

 

Stephan

 

 

On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <[hidden email]> wrote:

This is what we’re using as our serializer:

 

Somewhere:

 

           env.addDefaultKryoSerializer(Record.class, GRKryoSerializer.class);

 

then

 

public class GRKryoSerializer extends Serializer<GenericData.Record>

{

     static class AvroStuff

     {

           Schema schema;

           String comment;

           long key;

           DatumReader<GenericRecord> reader;

           DatumWriter<GenericRecord> writer;

     }

     static Map<Long, AvroStuff> schemaMap = new ConcurrentHashMap<>();

     static Map<Schema, Long> schemaToFingerprintMap = new ConcurrentHashMap<>();

     static Logger log = Logger.getLogger(GRKryoSerializer.class.getName());

    

    

     static public void preregisterSchema(String comment, Schema schema)

     {

           if(!schemaToFingerprintMap.containsKey(schema)){

                long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                AvroStuff stuff = new AvroStuff();

                stuff.schema = schema;

                stuff.comment = comment;

                stuff.key = fingerprint;

                stuff.reader = new GenericDatumReader<>(schema);

                stuff.writer = new GenericDatumWriter<>(schema);

                log.info(String.format("Preregistering schema for %s with fingerprint %d", comment, fingerprint));

                schemaMap.put(fingerprint, stuff);

                schemaToFingerprintMap.put(schema, fingerprint);

           }

     }

    

     public GRKryoSerializer() {

     }

 

     static public void clearSchemaCache()

     {

           schemaToFingerprintMap.clear();

           schemaMap.clear();

     }

    

     static public AvroStuff getStuffFor(GenericRecord o)

     {

           return getStuffFor(o.getSchema());

     }

    

     static public AvroStuff getStuffFor(Schema schema)

     {

           Long fingerprint = schemaToFingerprintMap.get(schema);

           if(fingerprint == null)

           {

               

                fingerprint = SchemaNormalization.parsingFingerprint64(schema);

                log.info(String.format("No fingerprint. Generated %d for schema %s", fingerprint, schema.toString(true)));

                schemaToFingerprintMap.put(schema, fingerprint);

               

                throw new RuntimeException("Unknown schema " + schema.toString(true));

               

           }

           return schemaMap.get(fingerprint);

     }

    

     @Override

     public void write(Kryo kryo, Output output, GenericData.Record object)

     {

           AvroStuff stuff = getStuffFor(object);

          

           BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);

           try {

                // write the schema key not the schema

                encoder.writeLong(stuff.key);

                // write the binary version of the fields only

                stuff.writer.write(object, encoder);

                encoder.flush();

           } catch (IOException e)

           {

                throw new RuntimeException(e);

           }

     }

 

     @Override

     public GenericData.Record read(Kryo kryo, Input input, Class<GenericData.Record> type)

     {

           BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);

           long fingerPrint;

           try {

                // read the key

                fingerPrint = decoder.readLong();

                // find it

                AvroStuff stuff = schemaMap.get(fingerPrint);

                // inflate using correct preregistered inflator

                return (Record) stuff.reader.read(null, decoder);

           } catch (IOException e) {

                throw new RuntimeException(e);

           }

     }

    

    

}

 

We add an instance of one of these to all our Flink Rich operations:

 

 

public class GRBuilder implements Serializable {

     public String getComment() {

           return comment;

     }

 

     public void setSchema(Schema schema) {

           this.schema = schema;

     }

 

     /**

     *

      */

     private static final long serialVersionUID = -3441080975441473751L;

     String schemaString;

     String comment;

    

     transient GenericRecordBuilder builder = null;

     transient Schema schema = null;

    

     public void registerSchema(){

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     private void readObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           realReadObject(input);

     }

    

     private void writeObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           realWriteObject(output);

     }

    

     // Ensure on inflation, the schema is registered against

     // the hashcode locally so we can inflate that type

 

     protected void realReadObject(ObjectInputStream input)

            throws IOException, ClassNotFoundException

     {

           schemaString = input.readUTF();

           comment = input.readUTF();

           builder = null;

           schema = null;

           GRKryoSerializer.preregisterSchema(comment, getSchema());

     }

    

     protected void realWriteObject(ObjectOutputStream output)

            throws IOException, ClassNotFoundException

     {

           output.writeUTF(schemaString);

           output.writeUTF(comment);

     }

    

     public GRBuilder()

     {

     }

 

     public GRBuilder(String comment , Schema s){

           schemaString = s.toString();

           builder = null;

           this.comment = comment;

          

           GRKryoSerializer.preregisterSchema(comment, s);

     }

    

     public synchronized GenericRecordBuilder getBuilder()

     {

           if(builder == null)

           {

                builder = new GenericRecordBuilder(getSchema());

           }

           return builder;

     }

    

     public synchronized Schema getSchema()

     {

           if(schema == null)

           {

                Schema.Parser p = new Schema.Parser();

                schema = p.parse(schemaString);

           }

           return schema;

     }

}

 

Our Mappers and such use the GRBuilder on the FlatMap rich class for example to get a Builder to create the output records for collection. We need to have A GRBUilder for each possible genericrecord schema as a variable on a Map object.

 

If we were torefactor this using the GenericTypeInfo or AvroSerializer, how would you suggest doing it?

 

Thanks

 

 

From: Stephan Ewen [mailto:[hidden email]]
Sent: Thursday, March 02, 2017 3:07 PM
To: [hidden email]; Aljoscha Krettek
Subject: Re: Serialization performance

 

Hi!

 

Thanks for this writeup, very cool stuff !

 

For part (1) - serialization: I think that can be made a bit nicer. Avro is a bit of an odd citizen in Flink, because Flink serialization is actually schema aware, but does not integrate with Avro. That's why Avro types go through Kryo.

 

We should try and make Avro a first class citizen.

  - The first step is to have a proper AvroSerializer. We have implemented one already, see "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It works with the ReflectDatumReader/Writer, but would be a good base line for all types of avro-based serializers in Flink..

 

  - Then we need to figure out how the Avro Serializer is instantiated. We could just let the "GenericTypeInfo" create an Avro serializer for Avro types, and Kryo for all else.

  - The change would eventually have to be behind a config flag in the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure we do not break the default serialization format within a major release version.

 

 

A side note: If you actually use that through Beam, I am actually not sure what will happen, because as far as I know, Beam  uses its completely own serialization system and Flink sees only byte coders from Beam. Aljoscha can probably add more detail here.

 

 

For part (2) - the filters: If I understand correctly, you "split" the data into different result sets that go to different sinks? The DataStream API has a "split/select" type of construct which would help there, the DataSet API does not have something like that. If you look for peak performance, the demux output format seems like a good workaround on the DataSet side.

 

 

Greetings,

Stephan

 

 

 

On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <[hidden email]> wrote:

We’ve been working on performance for the last while. We’re using flink 1.2 right now. We are writing batch jobs which process avro and parquet input files and produce parquet files.

 

Flink serialization costs seem to be the most significant aspect of our wall clock time. We have written a custom kryo serializer for GenericRecord which is similar to the Spark one in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema instead of the schema itself.

 

We have subclassed most of the Rich* classes in flink and now also pass to the constructors a Builder class which has the avro schema. When flink inflates these, the builders are inflated and preregister the avro schema for the hash code in a static map which the inflation/writing code uses later.

 

This is working pretty well for us, it’s like 10-20x faster than just using GenericRecords normally. The question is this: Is this the right way to do this? If it is then we can contribute it and then how to modify beam so that it uses this stuff under the covers, we can’t use beam at all right now as far as I can tell because of the performance issues with GenericRecord.

 

The other performance optimization is basically removing filters which again seem to double wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord> and writes to a different parquet file depending on Enum. This was 2x faster than a naïve 4 filters going to 4 parquet outputformats.

 

Do these optimizations seem unnecessary to some? Is there some trick we’re missing?