Kafka Rate Limit in FlinkConsumer ?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka Rate Limit in FlinkConsumer ?

David Magalhães
I've noticed that this FLINK-11501 was implemented in flink-connector-kafka-0.10 [1], but it wasn't in the current version of the flink-connector-kafka. There is any reason for this, and why should be the best solution to implement a rate limit functionality in the current Kafka consumer?

Thanks,
David

[1] 
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Rate Limit in FlinkConsumer ?

Chen Qin
My two cents here,

- flink job already has back pressure so rate limit can be done via setting parallelism to proper number in some use cases. There is an open issue of checkpointing reliability when back pressure, community seems working on it.

- rate limit can be abused easily and cause lot of confusions. Think about a use case where you have two streams do a simple interval join. Unless you were able to rate limit both with proper value dynamiclly, you might see timestamp and watermark gaps keep increasing causing checkpointing failure.

So the question might be, instead of looking at rate limit of one source, how to slow down all sources without ever increasing time, wm gaps. It sounds complicated already.

with what being said, if you really want to have rate limit on your own, you can try following code :) It works well for us.
public class SynchronousKafkaConsumer<T> extends FlinkKafkaConsumer<T> {

protected static final Logger LOG = LoggerFactory.getLogger(SynchronousKafkaConsumer.class);

private final double topicRateLimit;
private transient RateLimiter subtaskRateLimiter;

@Override
public void open(Configuration configuration) throws Exception {
Preconditions.checkArgument(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
"subtask ratelimit should be greater than 0.1 QPS");
subtaskRateLimiter = RateLimiter.create(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
super.open(configuration);
}

@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> partitionsWithOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {

return new KafkaFetcher<T>(
sourceContext,
partitionsWithOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics) {
@Override
protected void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record, partitionState, offset);
}

@Override
protected void emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
};

}
Thanks,

Chen
Pinterest Data


On Jul 6, 2020, at 7:43 AM, David Magalhães <[hidden email]> wrote:

I've noticed that this FLINK-11501 was implemented in flink-connector-kafka-0.10 [1], but it wasn't in the current version of the flink-connector-kafka. There is any reason for this, and why should be the best solution to implement a rate limit functionality in the current Kafka consumer?

Thanks,
David

[1] 
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java


Reply | Threaded
Open this post in threaded view
|

Re: Kafka Rate Limit in FlinkConsumer ?

David Magalhães
Thanks for the reply Chen.

My use case is a "simple" get from Kafka into S3. The job can read very quickly from Kafka and S3 is having some issues keeping up. The backpressure don't have enough time to actuate in this case, and when it reaches the checkpoint time some errors like heartbeat timeout or task manager didn't reply back starts to happen.

I will investigate further and try this example.

On Mon, Jul 6, 2020 at 5:45 PM Chen Qin <[hidden email]> wrote:
My two cents here,

- flink job already has back pressure so rate limit can be done via setting parallelism to proper number in some use cases. There is an open issue of checkpointing reliability when back pressure, community seems working on it.

- rate limit can be abused easily and cause lot of confusions. Think about a use case where you have two streams do a simple interval join. Unless you were able to rate limit both with proper value dynamiclly, you might see timestamp and watermark gaps keep increasing causing checkpointing failure.

So the question might be, instead of looking at rate limit of one source, how to slow down all sources without ever increasing time, wm gaps. It sounds complicated already.

with what being said, if you really want to have rate limit on your own, you can try following code :) It works well for us.
public class SynchronousKafkaConsumer<T> extends FlinkKafkaConsumer<T> {

protected static final Logger LOG = LoggerFactory.getLogger(SynchronousKafkaConsumer.class);

private final double topicRateLimit;
private transient RateLimiter subtaskRateLimiter;

@Override
public void open(Configuration configuration) throws Exception {
Preconditions.checkArgument(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
"subtask ratelimit should be greater than 0.1 QPS");
subtaskRateLimiter = RateLimiter.create(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
super.open(configuration);
}

@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> partitionsWithOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {

return new KafkaFetcher<T>(
sourceContext,
partitionsWithOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics) {
@Override
protected void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record, partitionState, offset);
}

@Override
protected void emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
};

}
Thanks,

Chen
Pinterest Data


On Jul 6, 2020, at 7:43 AM, David Magalhães <[hidden email]> wrote:

I've noticed that this FLINK-11501 was implemented in flink-connector-kafka-0.10 [1], but it wasn't in the current version of the flink-connector-kafka. There is any reason for this, and why should be the best solution to implement a rate limit functionality in the current Kafka consumer?

Thanks,
David

[1] 
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java


Reply | Threaded
Open this post in threaded view
|

Re: Kafka Rate Limit in FlinkConsumer ?

Till Rohrmann
Two quick comments: With unaligned checkpoints which are released with Flink 1.11.0, the problem of slow checkpoints under backpressure has been resolved/mitigated to a good extent. Moreover, the community wants to work on event time alignment for sources in the next release. This should prevent that different sources diverge too much wrt event time.

Cheers,
Till

On Tue, Jul 7, 2020 at 2:48 AM David Magalhães <[hidden email]> wrote:
Thanks for the reply Chen.

My use case is a "simple" get from Kafka into S3. The job can read very quickly from Kafka and S3 is having some issues keeping up. The backpressure don't have enough time to actuate in this case, and when it reaches the checkpoint time some errors like heartbeat timeout or task manager didn't reply back starts to happen.

I will investigate further and try this example.

On Mon, Jul 6, 2020 at 5:45 PM Chen Qin <[hidden email]> wrote:
My two cents here,

- flink job already has back pressure so rate limit can be done via setting parallelism to proper number in some use cases. There is an open issue of checkpointing reliability when back pressure, community seems working on it.

- rate limit can be abused easily and cause lot of confusions. Think about a use case where you have two streams do a simple interval join. Unless you were able to rate limit both with proper value dynamiclly, you might see timestamp and watermark gaps keep increasing causing checkpointing failure.

So the question might be, instead of looking at rate limit of one source, how to slow down all sources without ever increasing time, wm gaps. It sounds complicated already.

with what being said, if you really want to have rate limit on your own, you can try following code :) It works well for us.
public class SynchronousKafkaConsumer<T> extends FlinkKafkaConsumer<T> {

protected static final Logger LOG = LoggerFactory.getLogger(SynchronousKafkaConsumer.class);

private final double topicRateLimit;
private transient RateLimiter subtaskRateLimiter;

@Override
public void open(Configuration configuration) throws Exception {
Preconditions.checkArgument(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
"subtask ratelimit should be greater than 0.1 QPS");
subtaskRateLimiter = RateLimiter.create(
topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
super.open(configuration);
}

@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> partitionsWithOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {

return new KafkaFetcher<T>(
sourceContext,
partitionsWithOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics) {
@Override
protected void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record, partitionState, offset);
}

@Override
protected void emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
};

}
Thanks,

Chen
Pinterest Data


On Jul 6, 2020, at 7:43 AM, David Magalhães <[hidden email]> wrote:

I've noticed that this FLINK-11501 was implemented in flink-connector-kafka-0.10 [1], but it wasn't in the current version of the flink-connector-kafka. There is any reason for this, and why should be the best solution to implement a rate limit functionality in the current Kafka consumer?

Thanks,
David

[1] 
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java