Re: FlinkKafkaProducer usage

Posted by Dawid Wysakowicz on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkKafkaProducer-usage-tp11395p11399.html

Have a look at https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

02.02.2017 1:07 AM "Boris Lublinsky" <[hidden email]> napisał(a):
I am trying to write a quick sample of streaming word count using Beam APIs and FlinkBeamRunner.
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a custom ParDo function:

/**
* Write content to Kafka.
*
*/
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, Tuple2<String, Integer>> {

private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
private boolean opened = false;

public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink){
this.kafkaSink = kafkaSink;
}

@ProcessElement
public void processElement(ProcessContext c) {
if(!opened){
kafkaSink.open(new Configuration());
opened = true;
}
Tuple2<String, Integer> record = c.element();
try {
kafkaSink.invoke(record);
}catch(Throwable t){
System.out.println("Error writing record " + record + " to Kafka");
t.printStackTrace();
}
}
}


The problem with this approach is that ParDo is not initialized with Streaming context, that FlinkKafkaConsumer relies upon, so open fails.


Any suggestions?