Different deserialization schemas for Kafka keys and values

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Different deserialization schemas for Kafka keys and values

Manas Kale
Hi,
I have a kafka topic on which the key is serialized in a custom format and the value is serialized as JSON. How do I create a FlinkKafakConsumer that has different deserialization schemas for the key and value? Here's what I tried:

FlinkKafkaConsumer<Tuple2<MyClass, ObjectNode>> advancedFeatureData = new FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new TypeInformationKeyValueSerializationSchema<MyClass, ObjectNode>(
TypeInformation.of(new TypeHint<MyClass>() {}),
TypeInformation.of(new TypeHint<ObjectNode>() {}),
env.getConfig()
), properties);
However, I get the error:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 121
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

Is there something I am missing with my approach or am I supposed to use a completely different class than TypeInformationKeyValueSerializationSchema?
Reply | Threaded
Open this post in threaded view
|

Re: Different deserialization schemas for Kafka keys and values

rmetzger0
Hi,

Check out the KafkaDeserializationSchema ( https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-deserializationschema) which allows you to deserialize the key and value bytes coming from Kafka.

Best,
Robert


On Thu, Aug 27, 2020 at 1:56 PM Manas Kale <[hidden email]> wrote:
Hi,
I have a kafka topic on which the key is serialized in a custom format and the value is serialized as JSON. How do I create a FlinkKafakConsumer that has different deserialization schemas for the key and value? Here's what I tried:

FlinkKafkaConsumer<Tuple2<MyClass, ObjectNode>> advancedFeatureData = new FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new TypeInformationKeyValueSerializationSchema<MyClass, ObjectNode>(
TypeInformation.of(new TypeHint<MyClass>() {}),
TypeInformation.of(new TypeHint<ObjectNode>() {}),
env.getConfig()
), properties);
However, I get the error:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 121
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

Is there something I am missing with my approach or am I supposed to use a completely different class than TypeInformationKeyValueSerializationSchema?
Reply | Threaded
Open this post in threaded view
|

Re: Different deserialization schemas for Kafka keys and values

Manas Kale
Hi Robert,
Thanks for the info!

On Thu, Aug 27, 2020 at 8:01 PM Robert Metzger <[hidden email]> wrote:
Hi,

Check out the KafkaDeserializationSchema ( https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-deserializationschema) which allows you to deserialize the key and value bytes coming from Kafka.

Best,
Robert


On Thu, Aug 27, 2020 at 1:56 PM Manas Kale <[hidden email]> wrote:
Hi,
I have a kafka topic on which the key is serialized in a custom format and the value is serialized as JSON. How do I create a FlinkKafakConsumer that has different deserialization schemas for the key and value? Here's what I tried:

FlinkKafkaConsumer<Tuple2<MyClass, ObjectNode>> advancedFeatureData = new FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new TypeInformationKeyValueSerializationSchema<MyClass, ObjectNode>(
TypeInformation.of(new TypeHint<MyClass>() {}),
TypeInformation.of(new TypeHint<ObjectNode>() {}),
env.getConfig()
), properties);
However, I get the error:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 121
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

Is there something I am missing with my approach or am I supposed to use a completely different class than TypeInformationKeyValueSerializationSchema?