FlinkKafkaProducer usage

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaProducer usage

Boris Lublinsky
I am trying to write a quick sample of streaming word count using Beam APIs and FlinkBeamRunner.
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a custom ParDo function:

/**
* Write content to Kafka.
*
*/
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, Tuple2<String, Integer>> {

private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
private boolean opened = false;

public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink){
this.kafkaSink = kafkaSink;
}

@ProcessElement
public void processElement(ProcessContext c) {
if(!opened){
kafkaSink.open(new Configuration());
opened = true;
}
Tuple2<String, Integer> record = c.element();
try {
kafkaSink.invoke(record);
}catch(Throwable t){
System.out.println("Error writing record " + record + " to Kafka");
t.printStackTrace();
}
}
}


The problem with this approach is that ParDo is not initialized with Streaming context, that FlinkKafkaConsumer relies upon, so open fails.


Any suggestions? 
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducer usage

Dawid Wysakowicz

02.02.2017 1:07 AM "Boris Lublinsky" <[hidden email]> napisał(a):
I am trying to write a quick sample of streaming word count using Beam APIs and FlinkBeamRunner.
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a custom ParDo function:

/**
* Write content to Kafka.
*
*/
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, Tuple2<String, Integer>> {

private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
private boolean opened = false;

public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink){
this.kafkaSink = kafkaSink;
}

@ProcessElement
public void processElement(ProcessContext c) {
if(!opened){
kafkaSink.open(new Configuration());
opened = true;
}
Tuple2<String, Integer> record = c.element();
try {
kafkaSink.invoke(record);
}catch(Throwable t){
System.out.println("Error writing record " + record + " to Kafka");
t.printStackTrace();
}
}
}


The problem with this approach is that ParDo is not initialized with Streaming context, that FlinkKafkaConsumer relies upon, so open fails.


Any suggestions?