Flink operator throttle

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink operator throttle

王雷
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external database, we want to limit rate actively.

Thanks
Ray
Reply | Threaded
Open this post in threaded view
|

Re: Flink operator throttle

Benchao Li
AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka source to have a rate limiter.
(I assume you uses Kafka)
However it only exists in Kafka 0.10 DataStream Connector, not in other versions nor table api.

王雷 <[hidden email]> 于2020年5月14日周四 下午5:31写道:
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external database, we want to limit rate actively.

Thanks
Ray


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink operator throttle

王雷
Hi Benchao

Thanks for your answer!

According to your answer, I found `GuavaFlinkConnectorRateLimiter` which is the implementation of the `FlinkConnectorRateLimiter`.

If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.

I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?

Thanks
Ray

Benchao Li <[hidden email]> 于2020年5月14日周四 下午5:49写道:
AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka source to have a rate limiter.
(I assume you uses Kafka)
However it only exists in Kafka 0.10 DataStream Connector, not in other versions nor table api.

王雷 <[hidden email]> 于2020年5月14日周四 下午5:31写道:
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external database, we want to limit rate actively.

Thanks
Ray


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink operator throttle

Benchao Li
Hi,

> If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.
Yes, you can do this by changing Kafka/ES sink, actually, this is how we did internally.

> I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?
In my understanding, there is no on-going work on this. And usually we should leverage the back-pressure feature to do this.
We can hear more from others whether this is a valid need.

王雷 <[hidden email]> 于2020年5月17日周日 下午2:32写道:
Hi Benchao

Thanks for your answer!

According to your answer, I found `GuavaFlinkConnectorRateLimiter` which is the implementation of the `FlinkConnectorRateLimiter`.

If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.

I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?

Thanks
Ray

Benchao Li <[hidden email]> 于2020年5月14日周四 下午5:49写道:
AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka source to have a rate limiter.
(I assume you uses Kafka)
However it only exists in Kafka 0.10 DataStream Connector, not in other versions nor table api.

王雷 <[hidden email]> 于2020年5月14日周四 下午5:31写道:
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external database, we want to limit rate actively.

Thanks
Ray


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink operator throttle

王雷
I see. I appreciate your help, thank you so much!

Benchao Li <[hidden email]> 于2020年5月17日周日 下午2:48写道:
Hi,

> If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.
Yes, you can do this by changing Kafka/ES sink, actually, this is how we did internally.

> I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?
In my understanding, there is no on-going work on this. And usually we should leverage the back-pressure feature to do this.
We can hear more from others whether this is a valid need.

王雷 <[hidden email]> 于2020年5月17日周日 下午2:32写道:
Hi Benchao

Thanks for your answer!

According to your answer, I found `GuavaFlinkConnectorRateLimiter` which is the implementation of the `FlinkConnectorRateLimiter`.

If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.

I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?

Thanks
Ray

Benchao Li <[hidden email]> 于2020年5月14日周四 下午5:49写道:
AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka source to have a rate limiter.
(I assume you uses Kafka)
However it only exists in Kafka 0.10 DataStream Connector, not in other versions nor table api.

王雷 <[hidden email]> 于2020年5月14日周四 下午5:31写道:
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external database, we want to limit rate actively.

Thanks
Ray


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink operator throttle

Chen Qin
In reply to this post by Benchao Li
Hi Ray,

In a bit abstract point of view, you can always throttle source and get proper sink throughput control. 
One approach might be just override base KafkaFetcher and use shaded guava rate limtier.


Best,

Chen


On Sat, May 16, 2020 at 11:48 PM Benchao Li <[hidden email]> wrote:
Hi,

> If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.
Yes, you can do this by changing Kafka/ES sink, actually, this is how we did internally.

> I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?
In my understanding, there is no on-going work on this. And usually we should leverage the back-pressure feature to do this.
We can hear more from others whether this is a valid need.

王雷 <[hidden email]> 于2020年5月17日周日 下午2:32写道:
Hi Benchao

Thanks for your answer!

According to your answer, I found `GuavaFlinkConnectorRateLimiter` which is the implementation of the `FlinkConnectorRateLimiter`.

If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.

I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?

Thanks
Ray

Benchao Li <[hidden email]> 于2020年5月14日周四 下午5:49写道:
AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka source to have a rate limiter.
(I assume you uses Kafka)
However it only exists in Kafka 0.10 DataStream Connector, not in other versions nor table api.

王雷 <[hidden email]> 于2020年5月14日周四 下午5:31写道:
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external database, we want to limit rate actively.

Thanks
Ray


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink operator throttle

Alexander Fedulov
Hi Chen,

just a small comment regarding your proposition: this would work well when one does a complete message passthrough. If there is some filtering in the pipeline, which could be dependent on the incoming stream data itself, the output throughput (the goal of the throttling) would be hard to control precisely.

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng




On Mon, May 18, 2020 at 7:55 AM Chen Qin <[hidden email]> wrote:
Hi Ray,

In a bit abstract point of view, you can always throttle source and get proper sink throughput control. 
One approach might be just override base KafkaFetcher and use shaded guava rate limtier.


Best,

Chen


On Sat, May 16, 2020 at 11:48 PM Benchao Li <[hidden email]> wrote:
Hi,

> If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.
Yes, you can do this by changing Kafka/ES sink, actually, this is how we did internally.

> I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?
In my understanding, there is no on-going work on this. And usually we should leverage the back-pressure feature to do this.
We can hear more from others whether this is a valid need.

王雷 <[hidden email]> 于2020年5月17日周日 下午2:32写道:
Hi Benchao

Thanks for your answer!

According to your answer, I found `GuavaFlinkConnectorRateLimiter` which is the implementation of the `FlinkConnectorRateLimiter`.

If I want to use the rate limiter in other connectors, such as Kafka sink, ES sink, I need to do some more work on these connectors.

I'd like to know if the community has a plan to make a lower-level implementation for all connectors, also for table API and SQL?

Thanks
Ray

Benchao Li <[hidden email]> 于2020年5月14日周四 下午5:49写道:
AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka source to have a rate limiter.
(I assume you uses Kafka)
However it only exists in Kafka 0.10 DataStream Connector, not in other versions nor table api.

王雷 <[hidden email]> 于2020年5月14日周四 下午5:31写道:
hi, All

Does Flink support rate limitation?
How to limit the rate when the external database connected by the sink operator has throughput limitation.
Instead of passive back pressure after reaching the limit of the external database, we want to limit rate actively.

Thanks
Ray


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]