Hi there,
I have the following use case-
I have data coming from Kafka which I need to stream and write each message to a database. I’m using kafka-flink connector for streaming data from Kafka. I don’t want to use flink sinks to write date from stream.
I’m doing the following which doesn’t seem to work-
messageStream
How can I iterate over each message in the stream and do something with that message?
Thanks
|
Hi folks,
I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java used by Flink consumer. Then, I use following code to read it. public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", “localhost:9092"); properties.setProperty("zookeeper.connect", “localhost:2181”); Schema schema = new Parser().parse("{" + "\"name\": \"test\", " + "\"type\": \"record\", " + "\"fields\": " +" [ " + " { \"name\": \"name\", \"type\": \"string\" }," + " { \"name\": \"symbol\", \"type\": \"string\" }," + " { \"name\": \"exchange\", \"type\": \"string\"}" + "] " +"}"); AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema); FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties); DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer); messageStream.rebalance().print(); env.execute("Flink AVRO KAFKA Test"); } Once, I run the code, I am able to get the schema information only as follows. {"name":"", "symbol":"", "exchange":""} {"name":"", "symbol":"", "exchange":""} {"name":"", "symbol":"", "exchange":""} {"name":"", "symbol":"", "exchange":”"} Could anyone help to find out the issues why I cannot decode it? Further troubleshooting, I found out if I use a kafka producer here to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead? Thanks, I’ll post this to the Kafka user group as well. Will
|
In reply to this post by Sandeep Vakacharla
Hi, a MapFunction should be the way to go for this use case.2016-11-03 0:00 GMT+01:00 Sandeep Vakacharla <[hidden email]>:
|
Free forum by Nabble | Edit this page |