Usage of KafkaDeserializationSchema and KafkaSerializationSchema

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Usage of KafkaDeserializationSchema and KafkaSerializationSchema

Jason Kania
Hello,

I was looking for documentation in 1.9.1 on how to create implementations of the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have created implementations in the past for the SerializationSchema and DeserializationSchema interface. Unfortunately, I can find no examples and the code contains no documentation for this purpose but some information appears missing.

Can someone please answer the following:

1) When creating a ProducerRecord with the KafkaSerializationSchema.serialize() method, how is the topic String supposed to be obtained by the implementing class? All of the constructors require that the topic be specified, but the topic is not passed in. Is there another interface that should be implemented to get the topic or get a callback? Or is expected that the topic has to be fixed in the interface's implementation class? Some of the constructors also ask for a partition. Again, where is this information expected to come from?

2) The interfaces specify that ConsumerRecord<byte[], byte[]> is received and ProducerRecord<byte[], byte[]> is to be generated. What are the 2 byte arrays referencing in the type definitions?

Thanks,

Jason
Reply | Threaded
Open this post in threaded view
|

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

David Magalhães
Hi Jason,

The topic is used in FlinkKafkaConsumer, following the KafkaDeserializationSchema and then Properties.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties)
...
class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord] {


On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <[hidden email]> wrote:
Hello,

I was looking for documentation in 1.9.1 on how to create implementations of the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have created implementations in the past for the SerializationSchema and DeserializationSchema interface. Unfortunately, I can find no examples and the code contains no documentation for this purpose but some information appears missing.

Can someone please answer the following:

1) When creating a ProducerRecord with the KafkaSerializationSchema.serialize() method, how is the topic String supposed to be obtained by the implementing class? All of the constructors require that the topic be specified, but the topic is not passed in. Is there another interface that should be implemented to get the topic or get a callback? Or is expected that the topic has to be fixed in the interface's implementation class? Some of the constructors also ask for a partition. Again, where is this information expected to come from?

2) The interfaces specify that ConsumerRecord<byte[], byte[]> is received and ProducerRecord<byte[], byte[]> is to be generated. What are the 2 byte arrays referencing in the type definitions?

Thanks,

Jason
Reply | Threaded
Open this post in threaded view
|

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

Jason Kania
Thanks for responding.

I am aware where the topic is used. What I do not see is how to set the topic within the class that implements the 
KafkaSerializationSchema.serialize(  T classObject, Long timestamp ) method.

The method must create and return a value of type ProducerRecord<byte[], byte[]>, but all the constructors for ProducerRecord expect "String topic" as the first argument. This will not be passed to the method so the question is where the implementation of the class is supposed to get the topic?

On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães <[hidden email]> wrote:


Hi Jason,

The topic is used in FlinkKafkaConsumer, following the KafkaDeserializationSchema and then Properties.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties)
...
class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord] {


On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <[hidden email]> wrote:
Hello,

I was looking for documentation in 1.9.1 on how to create implementations of the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have created implementations in the past for the SerializationSchema and DeserializationSchema interface. Unfortunately, I can find no examples and the code contains no documentation for this purpose but some information appears missing.

Can someone please answer the following:

1) When creating a ProducerRecord with the KafkaSerializationSchema.serialize() method, how is the topic String supposed to be obtained by the implementing class? All of the constructors require that the topic be specified, but the topic is not passed in. Is there another interface that should be implemented to get the topic or get a callback? Or is expected that the topic has to be fixed in the interface's implementation class? Some of the constructors also ask for a partition. Again, where is this information expected to come from?

2) The interfaces specify that ConsumerRecord<byte[], byte[]> is received and ProducerRecord<byte[], byte[]> is to be generated. What are the 2 byte arrays referencing in the type definitions?

Thanks,

Jason
Reply | Threaded
Open this post in threaded view
|

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

Chesnay Schepler
That's a fair question; the interface is indeed weird in this regard and does have some issues.

From what I can tell you have 2 options:
a) have the user pass the topic to the serialization schema constructor, which in practice would be identical to the topic they pass to the producer.
b) Additionally implement KafkaContextAware, and have the schema determine the topic _somehow_.

Both options are quite "eh" from a usability perspective;
a) requires the user to pass the same information around to multiple places,
b) is inefficient since the topic has to be determined twice per record (once when KCA#getTargetTopic is called, once again when serialize is called), and can maybe(?) result in subtle issues if the 2 calls determine different topics.

The Table API version of the sink handles this better since it's serialization schema only returns a byte array, and not a producer record.

You could also use one of  the deprecated constructors that accept a  "SerializationSchema" or "KeyedSerializationSchema" which handle this case better.

I've CC'd Aljoscha who was involved in the introduction of current iteration of the schema.

On 23/01/2020 03:13, Jason Kania wrote:
Thanks for responding.

I am aware where the topic is used. What I do not see is how to set the topic within the class that implements the 
KafkaSerializationSchema.serialize(  T classObject, Long timestamp ) method.

The method must create and return a value of type ProducerRecord<byte[], byte[]>, but all the constructors for ProducerRecord expect "String topic" as the first argument. This will not be passed to the method so the question is where the implementation of the class is supposed to get the topic?

On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães [hidden email] wrote:


Hi Jason,

The topic is used in FlinkKafkaConsumer, following the KafkaDeserializationSchema and then Properties.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties)
...
class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord] {



On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <[hidden email]> wrote:
Hello,

I was looking for documentation in 1.9.1 on how to create implementations of the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have created implementations in the past for the SerializationSchema and DeserializationSchema interface. Unfortunately, I can find no examples and the code contains no documentation for this purpose but some information appears missing.

Can someone please answer the following:

1) When creating a ProducerRecord with the KafkaSerializationSchema.serialize() method, how is the topic String supposed to be obtained by the implementing class? All of the constructors require that the topic be specified, but the topic is not passed in. Is there another interface that should be implemented to get the topic or get a callback? Or is expected that the topic has to be fixed in the interface's implementation class? Some of the constructors also ask for a partition. Again, where is this information expected to come from?

2) The interfaces specify that ConsumerRecord<byte[], byte[]> is received and ProducerRecord<byte[], byte[]> is to be generated. What are the 2 byte arrays referencing in the type definitions?

Thanks,

Jason


Reply | Threaded
Open this post in threaded view
|

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

Aljoscha Krettek
Hi,

the reason the new schema feels a bit weird is that it implements a new
paradigm in a FlinkKafkaProducer that still follows a somewhat older
paradigm. In the old paradigm, partitioning and topic where configured
on the sink, which made it fixed for all produced records. The new
schema allows setting the partition and topic dynamically, which, as I
said, doesn't play well with the old way.

As Chesnay said, you can either pass the topic to the schema, to make it
fixed for all records or encode it in the data and then forward it to
the record.

Best,
Aljoscha

On 23.01.20 11:35, Chesnay Schepler wrote:

> That's a fair question; the interface is indeed weird in this regard and
> does have some issues.
>
>  From what I can tell you have 2 options:
> a) have the user pass the topic to the serialization schema constructor,
> which in practice would be identical to the topic they pass to the
> producer.
> b) Additionally implement KafkaContextAware, and have the schema
> determine the topic _somehow_.
>
> Both options are quite "eh" from a usability perspective;
> a) requires the user to pass the same information around to multiple
> places,
> b) is inefficient since the topic has to be determined twice per record
> (once when KCA#getTargetTopic is called, once again when serialize is
> called), and can maybe(?) result in subtle issues if the 2 calls
> determine different topics.
>
> The Table API version of the sink handles this better since it's
> serialization schema only returns a byte array, and not a producer record.
>
> You could also use one of  the deprecated constructors that accept a
> "SerializationSchema" or "KeyedSerializationSchema" which handle this
> case better.
>
> I've CC'd Aljoscha who was involved in the introduction of current
> iteration of the schema.
>
> On 23/01/2020 03:13, Jason Kania wrote:
>> Thanks for responding.
>>
>> I am aware where the topic is used. What I do not see is how to set
>> the topic within the class that implements the
>> KafkaSerializationSchema.serialize(  T classObject, Long timestamp )
>> method.
>>
>> The method must create and return a value of type
>> ProducerRecord<byte[], byte[]>, but all the constructors for
>> ProducerRecord expect "String topic" as the first argument. This will
>> not be passed to the method so the question is where the
>> implementation of the class is supposed to get the topic?
>>
>> On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães
>> <[hidden email]> wrote:
>>
>>
>> Hi Jason,
>>
>> The topic is used in *FlinkKafkaConsumer*, following the
>> *KafkaDeserializationSchema* and then *Properties*.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html 
>>
>>
>> new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer,
>> kafkaProperties)
>> ...
>> class MessageDeserializer extends
>> KafkaDeserializationSchema[GenericRecord] {
>>
>>
>>
>> On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>     Hello,
>>
>>     I was looking for documentation in 1.9.1 on how to create
>>     implementations of the KafkaSerializationSchema and
>>     KafkaDeserializationSchema interfaces. I have created
>>     implementations in the past for the SerializationSchema and
>>     DeserializationSchema interface. Unfortunately, I can find no
>>     examples and the code contains no documentation for this purpose
>>     but some information appears missing.
>>
>>     Can someone please answer the following:
>>
>>     1) When creating a ProducerRecord with the
>>     KafkaSerializationSchema.serialize() method, how is the topic
>>     String supposed to be obtained by the implementing class? All of
>>     the constructors require that the topic be specified, but the
>>     topic is not passed in. Is there another interface that should be
>>     implemented to get the topic or get a callback? Or is expected
>>     that the topic has to be fixed in the interface's implementation
>>     class? Some of the constructors also ask for a partition. Again,
>>     where is this information expected to come from?
>>
>>     2) The interfaces specify that ConsumerRecord<byte[], byte[]> is
>>     received and ProducerRecord<byte[], byte[]> is to be generated.
>>     What are the 2 byte arrays referencing in the type definitions?
>>
>>     Thanks,
>>
>>     Jason
>>
>
>