Hello, I was looking for documentation in 1.9.1 on how to create implementations of the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have created implementations in the past for the SerializationSchema and DeserializationSchema interface. Unfortunately, I can find no examples and the code contains no documentation for this purpose but some information appears missing. Can someone please answer the following: 1) When creating a ProducerRecord with the KafkaSerializationSchema.serialize() method, how is the topic String supposed to be obtained by the implementing class? All of the constructors require that the topic be specified, but the topic is not passed in. Is there another interface that should be implemented to get the topic or get a callback? Or is expected that the topic has to be fixed in the interface's implementation class? Some of the constructors also ask for a partition. Again, where is this information expected to come from? 2) The interfaces specify that ConsumerRecord<byte[], byte[]> is received and ProducerRecord<byte[], byte[]> is to be generated. What are the 2 byte arrays referencing in the type definitions? Thanks, Jason |
Hi Jason, The topic is used in FlinkKafkaConsumer, following the KafkaDeserializationSchema and then Properties. https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties) ... class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord] { On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <[hidden email]> wrote:
|
Thanks for responding. I am aware where the topic is used. What I do not see is how to set the topic within the class that implements the KafkaSerializationSchema.serialize( T classObject, Long timestamp ) method. The method must create and return a value of type ProducerRecord<byte[], byte[]>, but all the constructors for ProducerRecord expect "String topic" as the first argument. This will not be passed to the method so the question is where the implementation of the class is supposed to get the topic?
On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães <[hidden email]> wrote:
Hi Jason, The topic is used in FlinkKafkaConsumer, following the KafkaDeserializationSchema and then Properties. https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, kafkaProperties) ... class MessageDeserializer extends KafkaDeserializationSchema[GenericRecord] { On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <[hidden email]> wrote:
|
That's a fair question; the interface
is indeed weird in this regard and does have some issues.
From what I can tell you have 2
options:
a) have the user pass the topic to the
serialization schema constructor, which in practice would be
identical to the topic they pass to the producer.
b) Additionally implement
KafkaContextAware, and have the schema determine the topic
_somehow_.
Both options are quite "eh" from a
usability perspective;
a) requires the user to pass the same
information around to multiple places,
b) is inefficient since the topic has
to be determined twice per record (once when KCA#getTargetTopic is
called, once again when serialize is called), and can maybe(?)
result in subtle issues if the 2 calls determine different topics.
The Table API version of the sink
handles this better since it's serialization schema only returns a
byte array, and not a producer record.
You could also use one of the
deprecated constructors that accept a "SerializationSchema" or
"KeyedSerializationSchema" which handle this case better.
I've CC'd Aljoscha who was involved in
the introduction of current iteration of the schema.
On 23/01/2020 03:13, Jason Kania wrote:
|
Hi,
the reason the new schema feels a bit weird is that it implements a new paradigm in a FlinkKafkaProducer that still follows a somewhat older paradigm. In the old paradigm, partitioning and topic where configured on the sink, which made it fixed for all produced records. The new schema allows setting the partition and topic dynamically, which, as I said, doesn't play well with the old way. As Chesnay said, you can either pass the topic to the schema, to make it fixed for all records or encode it in the data and then forward it to the record. Best, Aljoscha On 23.01.20 11:35, Chesnay Schepler wrote: > That's a fair question; the interface is indeed weird in this regard and > does have some issues. > > From what I can tell you have 2 options: > a) have the user pass the topic to the serialization schema constructor, > which in practice would be identical to the topic they pass to the > producer. > b) Additionally implement KafkaContextAware, and have the schema > determine the topic _somehow_. > > Both options are quite "eh" from a usability perspective; > a) requires the user to pass the same information around to multiple > places, > b) is inefficient since the topic has to be determined twice per record > (once when KCA#getTargetTopic is called, once again when serialize is > called), and can maybe(?) result in subtle issues if the 2 calls > determine different topics. > > The Table API version of the sink handles this better since it's > serialization schema only returns a byte array, and not a producer record. > > You could also use one of the deprecated constructors that accept a > "SerializationSchema" or "KeyedSerializationSchema" which handle this > case better. > > I've CC'd Aljoscha who was involved in the introduction of current > iteration of the schema. > > On 23/01/2020 03:13, Jason Kania wrote: >> Thanks for responding. >> >> I am aware where the topic is used. What I do not see is how to set >> the topic within the class that implements the >> KafkaSerializationSchema.serialize( T classObject, Long timestamp ) >> method. >> >> The method must create and return a value of type >> ProducerRecord<byte[], byte[]>, but all the constructors for >> ProducerRecord expect "String topic" as the first argument. This will >> not be passed to the method so the question is where the >> implementation of the class is supposed to get the topic? >> >> On Wednesday, January 22, 2020, 08:29:49 p.m. EST, David Magalhães >> <[hidden email]> wrote: >> >> >> Hi Jason, >> >> The topic is used in *FlinkKafkaConsumer*, following the >> *KafkaDeserializationSchema* and then *Properties*. >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html >> >> >> new FlinkKafkaConsumer(kafkaTopic, new MessageDeserializer, >> kafkaProperties) >> ... >> class MessageDeserializer extends >> KafkaDeserializationSchema[GenericRecord] { >> >> >> >> On Thu, Jan 23, 2020 at 1:20 AM Jason Kania <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> Hello, >> >> I was looking for documentation in 1.9.1 on how to create >> implementations of the KafkaSerializationSchema and >> KafkaDeserializationSchema interfaces. I have created >> implementations in the past for the SerializationSchema and >> DeserializationSchema interface. Unfortunately, I can find no >> examples and the code contains no documentation for this purpose >> but some information appears missing. >> >> Can someone please answer the following: >> >> 1) When creating a ProducerRecord with the >> KafkaSerializationSchema.serialize() method, how is the topic >> String supposed to be obtained by the implementing class? All of >> the constructors require that the topic be specified, but the >> topic is not passed in. Is there another interface that should be >> implemented to get the topic or get a callback? Or is expected >> that the topic has to be fixed in the interface's implementation >> class? Some of the constructors also ask for a partition. Again, >> where is this information expected to come from? >> >> 2) The interfaces specify that ConsumerRecord<byte[], byte[]> is >> received and ProducerRecord<byte[], byte[]> is to be generated. >> What are the 2 byte arrays referencing in the type definitions? >> >> Thanks, >> >> Jason >> > > |
Free forum by Nabble | Edit this page |