[Quesetion] how to havee additional Logging in Apache Beam KafkaWriter

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Quesetion] how to havee additional Logging in Apache Beam KafkaWriter

Eleanore Jin
Hi there, 

I am using Apache Beam (v2.16) in my application, and the Runner is Flink(1.8). I use KafkaIO connector to consume from source topics and publish to sink topics. 

Here is the class that Apache Beam provides for publishing messages.

Due to requirement, I need to log at info level for every message that has been published (regardless successful or failed).

So essentially, in this class, I need the logging added below, are there any suggestions for it?
private class SendCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
LOG.info("PublishToKafkaTopic. Published someId={} to topic={}",someId, topic);
} else {
LOG.error("PublishToKafkaTopic. Error publishing someId={} to topic={}",someId, topic, exception);
}
Thanks a lot!
Eleanore