should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?

Jin Yi
i'm currently using protobufs, and registering the serializers using kryo protobuf using the following snippet of code:

    static void optionalRegisterProtobufSerializer(ExecutionConfig config, Class<?> clazz) {

        if (clazz != null) {

            config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer.class);

        }

    }


    static void configureExecutionConfig(ExecutionConfig config) {

        optionalRegisterProtobufSerializer(config, User.class);

        optionalRegisterProtobufSerializer(config, View.class);

        optionalRegisterProtobufSerializer(config, Request.class);

        optionalRegisterProtobufSerializer(config, Insertion.class);

        optionalRegisterProtobufSerializer(config, Impression.class);

        optionalRegisterProtobufSerializer(config, Action.class);

        optionalRegisterProtobufSerializer(config, FlatEvent.class);

        optionalRegisterProtobufSerializer(config, LatestImpression.class);

    }


    // TODO - reuse with batch.

    void configureStreamExecutionEnvironment(StreamExecutionEnvironment env) {

        configureExecutionConfig(env.getConfig());

        if (checkpointInterval > 0) {

            env.enableCheckpointing(checkpointInterval);

        }

        env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);

        // TODO - evaluate if we want setMinPauseBetweenCheckpoints.

        if (minPauseBetweenCheckpoints > 0) {

            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);

        }

        if (unalignedCheckpoints) {

            env.getCheckpointConfig().enableUnalignedCheckpoints();

        }

        if (checkpointTimeout > 0) {

            env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);

        }

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    }


the concerning thing i have a question on is that i'm seeing these sorts of info logs in the taskmanager logs:

org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class ai.promoted.proto.event.FlatEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.


can i safely ignore these?  is it telling me that it's doing the right thing since kryo should kick in for GenericType?
Reply | Threaded
Open this post in threaded view
|

Re: should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?

Yun Gao
Hi Jin,

The warning would be given as long as trying to parse the type
as PoJo failed, and turn to the Kryo serializer. The registered 
ProtobufSerializer would acts as a plugin inside the kryo serializer.
Thus the warning should be able to be ignored. When serializing
it would first turn to the kryo serializer, and kryo serializer would check
the registered "plugin serializer" and turn to use ProtobufSerialize. 
You may also verify that via taking stack snapshot at the TM side for
double confirmation.

Best,
Yun


------------------Original Mail ------------------
Sender:Jin Yi <[hidden email]>
Send Date:Sun Jun 13 03:54:43 2021
Recipients:User-Flink <[hidden email]>
Subject:should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?
i'm currently using protobufs, and registering the serializers using kryo protobuf using the following snippet of code:

    static void optionalRegisterProtobufSerializer(ExecutionConfig config, Class<?> clazz) {

        if (clazz != null) {

            config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer.class);

        }

    }


    static void configureExecutionConfig(ExecutionConfig config) {

        optionalRegisterProtobufSerializer(config, User.class);

        optionalRegisterProtobufSerializer(config, View.class);

        optionalRegisterProtobufSerializer(config, Request.class);

        optionalRegisterProtobufSerializer(config, Insertion.class);

        optionalRegisterProtobufSerializer(config, Impression.class);

        optionalRegisterProtobufSerializer(config, Action.class);

        optionalRegisterProtobufSerializer(config, FlatEvent.class);

        optionalRegisterProtobufSerializer(config, LatestImpression.class);

    }


    // TODO - reuse with batch.

    void configureStreamExecutionEnvironment(StreamExecutionEnvironment env) {

        configureExecutionConfig(env.getConfig());

        if (checkpointInterval > 0) {

            env.enableCheckpointing(checkpointInterval);

        }

        env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);

        // TODO - evaluate if we want setMinPauseBetweenCheckpoints.

        if (minPauseBetweenCheckpoints > 0) {

            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);

        }

        if (unalignedCheckpoints) {

            env.getCheckpointConfig().enableUnalignedCheckpoints();

        }

        if (checkpointTimeout > 0) {

            env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);

        }

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    }


the concerning thing i have a question on is that i'm seeing these sorts of info logs in the taskmanager logs:

org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class ai.promoted.proto.event.FlatEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_

2021-06-12 17:47:03,230 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.


can i safely ignore these?  is it telling me that it's doing the right thing since kryo should kick in for GenericType?