Hi Agnelo,How is the writer schema encoded if you are using no schema registry? Or phrased differently: how does Flink know with which schema the data has been written so that it can map it to the new schema?On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta <[hidden email]> wrote:Hi, we are using Flink SQL 1.12 and have a couple of tables created from kafka topics. Format is avro (not confluent avro) and no schema registry as such.
In flink 1.11 we used to specify the schema, however in 1.12 the schema is derived from the message itself.
Is it possible for the producers to start sending new fields without changes in the flink app?
For example :
{
"name": "topic1",
"type": "record",
"fields": [
{
"name": "field1",
"type": "string"
},
{
"name": "field2",
"type": "string"
},
{
"name": "field3",
"type": "string"
},
]
}
Flink table has:
CREATE TABLE topic1(\n"
+ " field1 string not null \n"
+ " ,field2 string not null \n"
"'connector' = 'kafka' \n"
+ ",'topic' = 'topic1' \n"
+ ",'scan.startup.mode' = 'latest-offset' \n"
+ ",'properties.group.id' = 'topic1' \n"
+ ",'properties.bootstrap.servers' = 'localhost:8082' \n"
+ ",'properties.enable.auto.commit' = 'true' \n"
+ ",'format' = 'avro' \n";
With above settings I get a deserialization error:
java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104) ~[flink-sql-avro-1.12.0.jar:1.12.0]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) ~[flink-sql-avro-1.12.0.jar:1.12.0]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-core-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
Free forum by Nabble | Edit this page |