i'm currently using protobufs, and registering the serializers using kryo protobuf using the following snippet of code:
static void optionalRegisterProtobufSerializer(ExecutionConfig config, Class<?> clazz) { if (clazz != null) { config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer.class); } } static void configureExecutionConfig(ExecutionConfig config) { optionalRegisterProtobufSerializer(config, User.class); optionalRegisterProtobufSerializer(config, View.class); optionalRegisterProtobufSerializer(config, Request.class); optionalRegisterProtobufSerializer(config, Insertion.class); optionalRegisterProtobufSerializer(config, Impression.class); optionalRegisterProtobufSerializer(config, Action.class); optionalRegisterProtobufSerializer(config, FlatEvent.class); optionalRegisterProtobufSerializer(config, LatestImpression.class); } // TODO - reuse with batch. void configureStreamExecutionEnvironment(StreamExecutionEnvironment env) { configureExecutionConfig(env.getConfig()); if (checkpointInterval > 0) { env.enableCheckpointing(checkpointInterval); } env.getCheckpointConfig().setCheckpointingMode(checkpointingMode); // TODO - evaluate if we want setMinPauseBetweenCheckpoints. if (minPauseBetweenCheckpoints > 0) { env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints); } if (unalignedCheckpoints) { env.getCheckpointConfig().enableUnalignedCheckpoints(); } if (checkpointTimeout > 0) { env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout); } env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } the concerning thing i have a question on is that i'm seeing these sorts of info logs in the taskmanager logs: org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class ai.promoted.proto.event.FlatEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. can i safely ignore these? is it telling me that it's doing the right thing since kryo should kick in for GenericType? |
Hi Jin, The warning would be given as long as trying to parse the type as PoJo failed, and turn to the Kryo serializer. The registered ProtobufSerializer would acts as a plugin inside the kryo serializer. Thus the warning should be able to be ignored. When serializing it would first turn to the kryo serializer, and kryo serializer would check the registered "plugin serializer" and turn to use ProtobufSerialize. You may also verify that via taking stack snapshot at the TM side for double confirmation. Best, Yun
|
Free forum by Nabble | Edit this page |