FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Steve Bistline


I am getting this error from the Flink Kinesis Connector. I have a native KCL app running in parallel with no problems. 


Any help would be appreciated


Thanks so much!!


Steve


flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:11,579 WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  - Got recoverable SdkClientException. Backing off for 258 millis (Rate exceeded for shard shardId-000000000000 in stream CSV under account  xxxxxxxxx  . (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: e1c0caa4-8c4c-7738-b59f-4977bc762cf3))

flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:16,844 WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  - Got recoverable SdkClientException. Backing off for 203 millis (Rate exceeded for shard shardId-000000000001 in stream CSV under account xxxxxxxxx. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: f7d22c26-96f6-c547-a38d-affe493cd2e1))

Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

shkob1
If it's running in parallel aren't you just adding readers which maxes out
your provisioned throughput? probably doesn't belong in here but rather a
Kinesis thing, but i suggest increasing your number of shards?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Rafi Aroch
Hi Steve,

We've encountered this also. We have way more than enough shards, but were still getting exceptions.
We think we know what is the reason, we would love for someone to approve/reject.

What we suspect is happening is as follows:

The KPL's RateLimit parameter is tracking the amount of bytes/records written into a specific shard.
If the parallelism of your Sink is >1 (which is probably the case), multiple tasks == multiple KPL instances which may be writing to the same shard.
So for each individual KPL the RateLimit is not breached, but if multiple parallel tasks are writing to the same shard the RateLimit gets breached and a ProvisionedThroughputExceededException is being thrown.

What we've tried:
  • Using a random partition key to spread the load evenly between the shards. This did not work for us...
  • We tried to make records being written to the same shards by the same KPL instance, so the RateLimit would get enforced. We did a keyBy before the Sink to ensure same records go to the same task and using the same keyBy logic as the Kinesis partitionKey. This did not work for us...
What solved it eventually:

Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a queueSize so that we'll get back-pressured in case of high load (without getting ProvisionedThroughputExceededException exceptions). This solved the problem and currently is not a bottleneck for us, but can be soon. So this is not a real solution.

Can anyone suggest a better solution? Approve/reject our assumption?

Thanks
Rafi


On Sat, Nov 10, 2018, 03:02 shkob1 <[hidden email] wrote:
If it's running in parallel aren't you just adding readers which maxes out
your provisioned throughput? probably doesn't belong in here but rather a
Kinesis thing, but i suggest increasing your number of shards?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Tzu-Li (Gordon) Tai
Hi all,

I think Steve's occurrence of the warning was from the consumer side.

For the Flink Kinesis Consumer, this could most likely occur due to excessive ListShard API calls on the target Kinesis stream. The consumer uses this API to discover shards, at a fixed interval.
The problem with the current design is that all subtasks of the consumer would try to discover shards, and therefore during the discovery, it may be possible that AWS's service rate limit is hit.
The community is well aware of this shortcoming, and AFAIK, we have some plans to address this for Flink 1.8 / 1.9.

@Rafi, as for the producer side, you may want to take a look providing a FlinkKinesisPartitioner. By default, this is a round-robin partitioning of the records, i.e. records received by a subtask of the Kinesis sink can end up in any of the Kinesis shards.

Cheers,
Gordon 

On Mon, Nov 12, 2018 at 8:54 PM Rafi Aroch <[hidden email]> wrote:
Hi Steve,

We've encountered this also. We have way more than enough shards, but were still getting exceptions.
We think we know what is the reason, we would love for someone to approve/reject.

What we suspect is happening is as follows:

The KPL's RateLimit parameter is tracking the amount of bytes/records written into a specific shard.
If the parallelism of your Sink is >1 (which is probably the case), multiple tasks == multiple KPL instances which may be writing to the same shard.
So for each individual KPL the RateLimit is not breached, but if multiple parallel tasks are writing to the same shard the RateLimit gets breached and a ProvisionedThroughputExceededException is being thrown.

What we've tried:
  • Using a random partition key to spread the load evenly between the shards. This did not work for us...
  • We tried to make records being written to the same shards by the same KPL instance, so the RateLimit would get enforced. We did a keyBy before the Sink to ensure same records go to the same task and using the same keyBy logic as the Kinesis partitionKey. This did not work for us...
What solved it eventually:

Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a queueSize so that we'll get back-pressured in case of high load (without getting ProvisionedThroughputExceededException exceptions). This solved the problem and currently is not a bottleneck for us, but can be soon. So this is not a real solution.

Can anyone suggest a better solution? Approve/reject our assumption?

Thanks
Rafi


On Sat, Nov 10, 2018, 03:02 shkob1 <[hidden email] wrote:
If it's running in parallel aren't you just adding readers which maxes out
your provisioned throughput? probably doesn't belong in here but rather a
Kinesis thing, but i suggest increasing your number of shards?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Rafi Aroch
Hi Gordon,

Thanks for the reply.

So is it true to say that the KPL RateLimit would not get enforced when the sink parallelism is >1? If multiple subtasks are writing to the same shard and each has their own RateLimit, it is possible that the RateLimit is crossed.

If that's the case, can you suggest a way to overcome this?

Thanks,
Rafi


On Tue, Nov 13, 2018 at 6:27 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi all,

I think Steve's occurrence of the warning was from the consumer side.

For the Flink Kinesis Consumer, this could most likely occur due to excessive ListShard API calls on the target Kinesis stream. The consumer uses this API to discover shards, at a fixed interval.
The problem with the current design is that all subtasks of the consumer would try to discover shards, and therefore during the discovery, it may be possible that AWS's service rate limit is hit.
The community is well aware of this shortcoming, and AFAIK, we have some plans to address this for Flink 1.8 / 1.9.

@Rafi, as for the producer side, you may want to take a look providing a FlinkKinesisPartitioner. By default, this is a round-robin partitioning of the records, i.e. records received by a subtask of the Kinesis sink can end up in any of the Kinesis shards.

Cheers,
Gordon 

On Mon, Nov 12, 2018 at 8:54 PM Rafi Aroch <[hidden email]> wrote:
Hi Steve,

We've encountered this also. We have way more than enough shards, but were still getting exceptions.
We think we know what is the reason, we would love for someone to approve/reject.

What we suspect is happening is as follows:

The KPL's RateLimit parameter is tracking the amount of bytes/records written into a specific shard.
If the parallelism of your Sink is >1 (which is probably the case), multiple tasks == multiple KPL instances which may be writing to the same shard.
So for each individual KPL the RateLimit is not breached, but if multiple parallel tasks are writing to the same shard the RateLimit gets breached and a ProvisionedThroughputExceededException is being thrown.

What we've tried:
  • Using a random partition key to spread the load evenly between the shards. This did not work for us...
  • We tried to make records being written to the same shards by the same KPL instance, so the RateLimit would get enforced. We did a keyBy before the Sink to ensure same records go to the same task and using the same keyBy logic as the Kinesis partitionKey. This did not work for us...
What solved it eventually:

Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a queueSize so that we'll get back-pressured in case of high load (without getting ProvisionedThroughputExceededException exceptions). This solved the problem and currently is not a bottleneck for us, but can be soon. So this is not a real solution.

Can anyone suggest a better solution? Approve/reject our assumption?

Thanks
Rafi


On Sat, Nov 10, 2018, 03:02 shkob1 <[hidden email] wrote:
If it's running in parallel aren't you just adding readers which maxes out
your provisioned throughput? probably doesn't belong in here but rather a
Kinesis thing, but i suggest increasing your number of shards?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/