FlinkKafkaProducer010: why is checkErroneous() at the beginning of the invoke() method

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaProducer010: why is checkErroneous() at the beginning of the invoke() method

HarshithBolar

Hi all,

 

I had a requirement to handle Kafka producer exceptions so that they don’t bring down the job. I extended FlinkKafkaProducer010 and handled the exceptions there.

 

public void invoke(T value, Context context) throws Exception {

              try {

this.checkErroneous();

                     ...

this.producer.send(record, this.callback);

              } catch (Exception exception) {

                     // Handle exception

              }

       }

The problem with this is, because checkErroneous() is at the beginning of the invoke() method, the catch block is getting triggered for the next message – not for the message that is causing the exception. So, I moved checkErroneous() below producer.send() like so –

 

       public void invoke(T value, Context context) throws Exception {

              try {

                     ...

                     this.producer.send(record, this.callback);

                     this.checkErroneous();

              } catch (Exception exception) {

                     // Handle exception

              }

       }

 

This solved the issue, the exceptions are now being thrown for the message that’s causing the error instead of the next message.

 

Is there a specific reason why checkErroneous() is on top? Or am I doing something wrong?

Class: https://github.com/apache/flink/blob/19d20e5cf8d44d726b4a44575e6c8db677e4c3c8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java

 

Regards,

Harshith

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducer010: why is checkErroneous() at the beginning of the invoke() method

Chesnay Schepler
It is called at the top since exceptions can occur asynchronously when invoke() already exited. In this case the only place you can fail is if the next record is being processed.

On 12/04/2019 11:00, Kumar Bolar, Harshith wrote:

Hi all,

 

I had a requirement to handle Kafka producer exceptions so that they don’t bring down the job. I extended FlinkKafkaProducer010 and handled the exceptions there.

 

public void invoke(T value, Context context) throws Exception {

              try {

this.checkErroneous();

                     ...

this.producer.send(record, this.callback);

              } catch (Exception exception) {

                     // Handle exception

              }

       }

The problem with this is, because checkErroneous() is at the beginning of the invoke() method, the catch block is getting triggered for the next message – not for the message that is causing the exception. So, I moved checkErroneous() below producer.send() like so –

 

       public void invoke(T value, Context context) throws Exception {

              try {

                     ...

                     this.producer.send(record, this.callback);

                     this.checkErroneous();

              } catch (Exception exception) {

                     // Handle exception

              }

       }

 

This solved the issue, the exceptions are now being thrown for the message that’s causing the error instead of the next message.

 

Is there a specific reason why checkErroneous() is on top? Or am I doing something wrong?

Class: https://github.com/apache/flink/blob/19d20e5cf8d44d726b4a44575e6c8db677e4c3c8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java

 

Regards,

Harshith