Confluent schema registry Kafka client for sql-client.sh

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Confluent schema registry Kafka client for sql-client.sh

Ruurtjan Pul
Hi there,

I'm not sure if this is possible, because it doesn't seem to be documented anywhere, so here we go...

I've got a Kafka topic with Avro encoded records that have been produced with a Confluent schema registry Kafka client. Note that these Avro records are slightly different from vanilla Avro messages, as the Confluent producer adds (prepends?) some metadata that encodes the schema version that's used for encoding the record. So you can't just decode it with a regular Avro client, or you'll get:
> Flink SQL> select * from SomeTopic limit 10;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1

Now, I'd like to read this topic in a sql-client.sh session, but can't seem to work out how to do so. There is a ConfluentRegistryAvroDeserializationSchema, for which I can only find a Flink jar, not a Flink sql jar. There's also the `flink-sql-connector-kafka_2.12-1.10.0.jar` and `flink-avro-1.10.0-sql-jar.jar` jars, but they suffer the `Malformed data` issue I mentioned before.

Is there any way to read from a Confluent schema registry Avro encoded Kafka topic in a Flink SQL session? (Try saying that three times fast ;))

Best,
Ruurtjan
Reply | Threaded
Open this post in threaded view
|

Re: Confluent schema registry Kafka client for sql-client.sh

Jark Wu-3
Hi Ruurtjan,

Thanks for reporting this. There is already an issue to track to support this feature! https://issues.apache.org/jira/browse/FLINK-16048
Currently, Flink SQL only supports JSON,CSV and standard AVRO format. 
The Kafka Avro format is not supported yet, but this's definitely in our roadmap and in a high priority. 

As a workaround, you can implement a DeserializationSchemaFactory for Confluent Avro using the existing ConfluentRegistryAvroDeserializationSchema. And register the factory class path to the "/resources/META-INF/services/org.apache.flink.table.factories.TableFactory" for SPI. You can take AvroRowFormatFactory as an example.

Best,
Jark


On Fri, 3 Apr 2020 at 21:51, Ruurtjan Pul <[hidden email]> wrote:
Hi there,

I'm not sure if this is possible, because it doesn't seem to be documented anywhere, so here we go...

I've got a Kafka topic with Avro encoded records that have been produced with a Confluent schema registry Kafka client. Note that these Avro records are slightly different from vanilla Avro messages, as the Confluent producer adds (prepends?) some metadata that encodes the schema version that's used for encoding the record. So you can't just decode it with a regular Avro client, or you'll get:
> Flink SQL> select * from SomeTopic limit 10;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1

Now, I'd like to read this topic in a sql-client.sh session, but can't seem to work out how to do so. There is a ConfluentRegistryAvroDeserializationSchema, for which I can only find a Flink jar, not a Flink sql jar. There's also the `flink-sql-connector-kafka_2.12-1.10.0.jar` and `flink-avro-1.10.0-sql-jar.jar` jars, but they suffer the `Malformed data` issue I mentioned before.

Is there any way to read from a Confluent schema registry Avro encoded Kafka topic in a Flink SQL session? (Try saying that three times fast ;))

Best,
Ruurtjan