I’ve been trying to optimize performance of my application by playing around with the serialization process when sending data across operators.
Currently I’ve source stream(1) reading from Kafka Avro data, an operator(2) which converts avro and generates byte[] from POJO lastly an operator(3) to filter inputs. Operators 2 & 3 are extending from “ProcessFunction”.
1.
DataStream<byte[]> kafkaStream = env.addSource(flinkKafkaConsumer).setParallelism(KAFKA_PARALLELISM).name("KafkaConsumer");
2.
AvroBytesConversionPerfProcess avroBytesConversionProcess = new AvroBytesConversionPerfProcess(schemaRegistryUrl) {}; SingleOutputStreamOperator<byte[]> avroProccesedStream = kafkaStream.process(avroBytesConversionProcess).setParallelism(AVRO_PARALLELISM).name("AvroStreamProcess");
3.
ExtractTransformPerfProcess extractor = new ExtractTransformPerfProcess();
SingleOutputStreamOperator<Tuple1<String, String> extractTransformedStream = avroProccesedStream.process(extractor).setParallelism(EXTRACT_PARALLELISM).name("ExtractTransformProcess");
4. POJO
public class CustomRecord {
public String parentName;
private String fieldName;
private Type fieldType;
private Object fieldValue;
private Object defaultValue;
}
Throughput of Kafka records out/sec is around 4K TPS if I emit dummy byte[] e.g. bytes from milliseconds from operator 2 to 3 but moment I try to convert POJO to Avro & get byte[] out in operator 2 its performance drops to 1.5K TPS. Why does it drop so much even when operator 3 doesn’t do much?
Arvid Heise | Senior Java Developer
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbHFree forum by Nabble | Edit this page |