Hi, we are using Flink SQL 1.12 and have a couple of tables created from kafka topics. Format is avro (not confluent avro) and no schema registry as such. In flink 1.11 we used to specify the schema, however in 1.12 the schema is derived from the message itself. Is it possible for the producers to start sending new fields without changes in the flink app?
For example : { "name": "topic1", "type": "record", "fields": [ { "name": "field1", "type": "string" }, { "name": "field2", "type": "string" }, { "name": "field3", "type": "string" }, ] }
Flink table has: CREATE TABLE topic1(\n" + " field1 string not null \n" + " ,field2 string not null \n" "'connector' = 'kafka' \n" + ",'topic' = 'topic1' \n" + ",'scan.startup.mode' = 'latest-offset' \n" + ",'properties.group.id' = 'topic1' \n" + ",'properties.bootstrap.servers' = 'localhost:8082' \n" + ",'properties.enable.auto.commit' = 'true' \n" + ",'format' = 'avro' \n";
With above settings I get a deserialization error:
java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104) ~[flink-sql-avro-1.12.0.jar:1.12.0] at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) ~[flink-sql-avro-1.12.0.jar:1.12.0] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-core-1.12.0.jar:1.12.0] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0] |
Hi Agnelo, How is the writer schema encoded if you are using no schema registry? Or phrased differently: how does Flink know with which schema the data has been written so that it can map it to the new schema? On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta <[hidden email]> wrote:
|
Hi Arvid, > writer schema encoded if you are using no schema registry?on the producer side we are using node with https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to publish messages. We specify the avro schema file to encode messages in avro format. >Flink know with which schema the data has been written so that it can map it to the new schema? With 1.11 we used to specify the schema file as part of the flink sql table definition. However with 1.12 the schema is derived from the message/table definition. We do not specify any schema as such. On Tue, Apr 13, 2021 at 11:58 PM Arvid Heise <[hidden email]> wrote:
|
For any schema change to be gracefully supported. You need to know both schemas (old + new) on reader side (=Flink). I'm curious how Flink should know the old schema as you only provide the new schema, right? Usually, you use the schema registry, such that each record has it's own schema attached to it (=old). You opted to not go for it, so I'm not sure how Flink is supposed to know the old schema. On Wed, Apr 14, 2021 at 9:09 AM Agnelo Dcosta <[hidden email]> wrote:
|
Hi Arvid, thanks for the reply. We are following the 1.12 documentation here: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro.html#data-type-mapping Currently, the Avro schema is always derived from table schema. Explicitly defining an Avro schema is not supported yet. And the issue is we need to have our producers use a newer topic schema(one additional column). Flink will also have this new column eventually. However for the time after deploying producer changes and before flink changes, flink will crash with failure to deserialize. Trying to see if there is any setting that could enable flink to continue reading new schema, without having that field specified in the table definition. On Wed, Apr 14, 2021 at 12:46 AM Arvid Heise <[hidden email]> wrote:
|
Hi Agnelo, if you reprocess all data and delete all old records with the old schema, then you have to add the schema to DDL and it will work. If you have records with old and new schema in your topic, you need to attach the schema information to the records. Avro records themselves do not have any metadata and Flink (or any other consumer) cannot convert them in any way. The usual approach to attach the schema information is to use the schema registry which Flink also supports. On Wed, Apr 14, 2021 at 5:22 PM Agnelo Dcosta <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |