FlinkKafkaProducer usage

Posted by Boris Lublinsky on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkKafkaProducer-usage-tp11395.html

I am trying to write a quick sample of streaming word count using Beam APIs and FlinkBeamRunner.
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a custom ParDo function:

/**
* Write content to Kafka.
*
*/
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, Tuple2<String, Integer>> {

private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
private boolean opened = false;

public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink){
this.kafkaSink = kafkaSink;
}

@ProcessElement
public void processElement(ProcessContext c) {
if(!opened){
kafkaSink.open(new Configuration());
opened = true;
}
Tuple2<String, Integer> record = c.element();
try {
kafkaSink.invoke(record);
}catch(Throwable t){
System.out.println("Error writing record " + record + " to Kafka");
t.printStackTrace();
}
}
}


The problem with this approach is that ParDo is not initialized with Streaming context, that FlinkKafkaConsumer relies upon, so open fails.


Any suggestions?