Flink Kafka Consumer Throughput reduces over time

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka Consumer Throughput reduces over time

Arpith techy
Currently I've Flink consumer with following properties, Flink consumes record at around 400 messages/sec at start of program but later on as numBuffersOut  exceeds 1000000, data rate falls to 200messages/sec. I've set parallelism to only 1, it's Avro based consumer and checkpointing is disabled. Does anyone else facing same issue?

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapAddress);
kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
kafkaProps.setProperty("group.id", groupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
ConfluentAvroDeserializationSchema deserializer = new ConfluentAvroDeserializationSchema(schemaRegURL);
FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps);
flinkKafkaConsumer.setStartFromLatest();
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter();
rateLimiter.setRate(12000000L);
flinkKafkaConsumer.setRateLimiter(rateLimiter);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
// ConfluentAvroDeserialzier
@Override
public GenericRecord deserialize(byte[] message) {
if (kafkaAvroDecoder == null) {
System.out.println("Kafka serizlier");
SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message);
}
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka Consumer Throughput reduces over time

Piotr Nowojski-3
Hi,

I haven’t heard about Flink specific problem like that. Have you checked that the records are not changing over time? That they are not for example twice as large or twice as heavy to process? Especially that you are using rate limiter with 12MB/s. If your records grew to 60KB in size, that would probably explain the problem.

If that’s not it, you would have to analyse what is changing in your job/cluster. Starting from CPU usage, Memory/GC, Network, IO. You can also attach some profiler to help pinpoint what is changing.

Piotrek 

On 31 Mar 2020, at 14:03, Arpith techy <[hidden email]> wrote:

Currently I've Flink consumer with following properties, Flink consumes record at around 400 messages/sec at start of program but later on as numBuffersOut  exceeds 1000000, data rate falls to 200messages/sec. I've set parallelism to only 1, it's Avro based consumer and checkpointing is disabled. Does anyone else facing same issue?

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapAddress);
kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
kafkaProps.setProperty("group.id", groupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
ConfluentAvroDeserializationSchema deserializer = new ConfluentAvroDeserializationSchema(schemaRegURL);
FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps);
flinkKafkaConsumer.setStartFromLatest();
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter();
rateLimiter.setRate(12000000L);
flinkKafkaConsumer.setRateLimiter(rateLimiter);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
// ConfluentAvroDeserialzier
@Override
public GenericRecord deserialize(byte[] message) {
if (kafkaAvroDecoder == null) {
System.out.println("Kafka serizlier");
SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message);
}