Kafka Avro Table Source

Posted by Will Du on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Existing-files-and-directories-are-not-overwritten-in-NO-OVERWRITE-mode-Use-OVERWRITE-mode-to-overwr-tp21130p21140.html

Hi folks,
I am working on using avro table source mapping to kafka source. By looking at the example, I think the current Flink v1.5.0 connector is not flexible enough. I wonder if I have to specify the avro record class to read from Kafka.

For withSchema, the schema can get from schema registry. However, the class of avro seems hard coded.

thanks,
Will

KafkaTableSource source = Kafka010AvroTableSource.builder()
  // set Kafka topic
  .forTopic("sensors")
  // set Kafka consumer properties
  .withKafkaProperties(kafkaProps)
  // set Table schema
  .withSchema(TableSchema.builder()
    .field("sensorId", Types.LONG())
    .field("temp", Types.DOUBLE())
    .field("time", Types.SQL_TIMESTAMP()).build())
  // set class of Avro record
  .forAvroRecordClass(SensorReading.class)  // ? Any way to get this without hard code class
  .build();