I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.
Thanks! Kenny // run some SQL to filter results where a key is not null String sql = "SELECT icao FROM flights WHERE icao is not null"; tableEnv.registerTableSource("flights", kafkaTableSource); Table result = tableEnv.sql(sql); // create a partition for the data going into kafka FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); // create new tablesink of JSON to kafka KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( params.getRequired("write-topic"), params.getProperties(), partition); result.writeToSink(tableEnv, kafkaTableSink); // Logically, I want to do this, but no such method.. |
Hi Kenny, this look almost correct. If not what kind of error and error message do you get? 2017-10-18 1:28 GMT+02:00 Kenny Gorman <[hidden email]>: I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA. |
Yep we hung out and got it working. I should have replied sooner! Thx for the reply.
-kg
|
No worries :-) Thanks for the notice. 2017-10-18 15:07 GMT+02:00 Kenny Gorman <[hidden email]>:
|
Free forum by Nabble | Edit this page |