Stumped writing to KafkaJSONSink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Stumped writing to KafkaJSONSink

Kenny Gorman
I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
        params.getRequired("write-topic"),
        params.getProperties(),
        partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but no such method..
Reply | Threaded
Open this post in threaded view
|

Re: Stumped writing to KafkaJSONSink

Fabian Hueske-2
Hi Kenny,

this look almost correct.
The Table class has a method writeToSink(TableSink) that should address your use case (so the same as yours but without the TableEnvironment argument).

Does that work for you?
If not what kind of error and error message do you get?

Best, Fabian

2017-10-18 1:28 GMT+02:00 Kenny Gorman <[hidden email]>:
I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
        params.getRequired("write-topic"),
        params.getProperties(),
        partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but no such method..

Reply | Threaded
Open this post in threaded view
|

Re: Stumped writing to KafkaJSONSink

Kenny Gorman
Yep we hung out and got it working. I should have replied sooner! Thx for the reply.

-kg

On Oct 18, 2017, at 7:06 AM, Fabian Hueske <[hidden email]> wrote:

Hi Kenny,

this look almost correct.
The Table class has a method writeToSink(TableSink) that should address your use case (so the same as yours but without the TableEnvironment argument).

Does that work for you?
If not what kind of error and error message do you get?

Best, Fabian

2017-10-18 1:28 GMT+02:00 Kenny Gorman <[hidden email]>:
I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
        params.getRequired("write-topic"),
        params.getProperties(),
        partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but no such method..

Reply | Threaded
Open this post in threaded view
|

Re: Stumped writing to KafkaJSONSink

Fabian Hueske-2
No worries :-) Thanks for the notice.

2017-10-18 15:07 GMT+02:00 Kenny Gorman <[hidden email]>:
Yep we hung out and got it working. I should have replied sooner! Thx for the reply.

-kg

On Oct 18, 2017, at 7:06 AM, Fabian Hueske <[hidden email]> wrote:

Hi Kenny,

this look almost correct.
The Table class has a method writeToSink(TableSink) that should address your use case (so the same as yours but without the TableEnvironment argument).

Does that work for you?
If not what kind of error and error message do you get?

Best, Fabian

2017-10-18 1:28 GMT+02:00 Kenny Gorman <[hidden email]>:
I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
        params.getRequired("write-topic"),
        params.getProperties(),
        partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but no such method..