I am trying to write a quick sample of streaming word count using Beam APIs and FlinkBeamRunner.The problem that I am getting into is thatapply("Write to Kafka", Write.to(UnboundedFlinkSink.
of(kafkaSink))) Does not work in this way - it assumes bounded stream and mine is unbounded.I have not found any unbounded equivalent for Write, So I tried to implement a custom ParDo function:/**
* Write content to Kafka.
*
*/
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
private boolean opened = false;
public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2< String, Integer>> kafkaSink){
this.kafkaSink = kafkaSink;
}
@ProcessElement
public void processElement(ProcessContext c) {
if(!opened){
kafkaSink.open(new Configuration());
opened = true;
}
Tuple2<String, Integer> record = c.element();
try {
kafkaSink.invoke(record);
}catch(Throwable t){
System.out.println("Error writing record " + record + " to Kafka");
t.printStackTrace();
}
}
}The problem with this approach is that ParDo is not initialized with Streaming context, that FlinkKafkaConsumer relies upon, so open fails.Any suggestions?
Free forum by Nabble | Edit this page |