Improving performance throughput in operators

Posted by Arpith P on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Improving-performance-throughput-in-operators-tp38469.html

I’ve been trying to optimize performance of my application by playing around with the serialization process when sending data across operators.

 

Currently I’ve source stream(1) reading from Kafka Avro data, an operator(2) which converts avro and generates byte[] from POJO lastly an operator(3) to filter inputs. Operators 2 & 3 are extending from “ProcessFunction”.

 

1.      

      DataStream<byte[]> kafkaStream = env.addSource(flinkKafkaConsumer).setParallelism(KAFKA_PARALLELISM).name("KafkaConsumer");

 

2.      

AvroBytesConversionPerfProcess avroBytesConversionProcess = new AvroBytesConversionPerfProcess(schemaRegistryUrl) {};
      SingleOutputStreamOperator<byte[]> avroProccesedStream =  kafkaStream.process(avroBytesConversionProcess).setParallelism(AVRO_PARALLELISM).name("AvroStreamProcess");

 

3.      

ExtractTransformPerfProcess extractor = new ExtractTransformPerfProcess();
SingleOutputStreamOperator<Tuple1<String, String> extractTransformedStream =  avroProccesedStream.process(extractor).setParallelism(EXTRACT_PARALLELISM).name("ExtractTransformProcess");

 

4.     POJO

public class CustomRecord {

  
public String parentName;
   private
String fieldName;
   private
Type fieldType;
   private
Object fieldValue;
   private
Object defaultValue;
 
}

 

image.png

Throughput of Kafka records out/sec is around 4K TPS if I emit dummy byte[] e.g. bytes from milliseconds from operator 2 to 3 but moment I try to convert POJO to Avro & get byte[] out in operator 2 its performance drops to 1.5K TPS. Why does it drop so much even when operator 3 doesn’t do much?