Hi,
I haven’t heard about Flink specific problem like that. Have you checked that the records are not changing over time? That they are not for example twice as large or twice as heavy to process? Especially that you are using rate limiter with 12MB/s. If your records grew to 60KB in size, that would probably explain the problem.
If that’s not it, you would have to analyse what is changing in your job/cluster. Starting from CPU usage, Memory/GC, Network, IO. You can also attach some profiler to help pinpoint what is changing.
Piotrek
Currently I've Flink consumer with following properties, Flink consumes record at around 400 messages/sec at start of program but later on as numBuffersOut exceeds 1000000, data rate falls to 200messages/sec. I've set parallelism to only 1, it's Avro based consumer and checkpointing is disabled. Does anyone else facing same issue?
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapAddress);
kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
kafkaProps.setProperty("group.id", groupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
ConfluentAvroDeserializationSchema deserializer = new ConfluentAvroDeserializationSchema(schemaRegURL);
FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps);
flinkKafkaConsumer.setStartFromLatest();
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter();
rateLimiter.setRate(12000000L);
flinkKafkaConsumer.setRateLimiter(rateLimiter);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
// ConfluentAvroDeserialzier
@Override
public GenericRecord deserialize(byte[] message) {
if (kafkaAvroDecoder == null) {
System.out.println("Kafka serizlier");
SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message);
}