FlinkKinesisProducer blocking ?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKinesisProducer blocking ?

Vijay Balakrishnan
Hi,
current setup.

Kinesis stream 1 -----> Kinesis Analytics Flink -----> Kinesis stream 2
|
----> Firehose Delivery stream

Curl eror:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader  - [2020-07-02 15:22:32.203053] [0x000007f4][0x00007ffbced15700] [error] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

But I am still seeing tons of the curl 28 error. I use parallelism of 80 for the Sink to Kinesis Data stream(KDS). Which seems to point to KDS being pounded with too many requests - the 80(parallelism) * 10(ThreadPool size) = 800 requests. Is my understanding correct ? So, maybe reduce the 80 parallelism ??
I still don't understand why the logs are stuck with just FlinkKInesisProducer for around 4s(blocking calls???) with the rest of the Flink Analytics application not producing any logs while this happens.
I noticed that the FlinkKInesisProducer took about 3.785secs, 3.984s, 4.223s in between other application logs in Kibana when the Kinesis GetIterator Age peaked. It seemed like FlinkKinesisProducer was blocking for that long when the Flink app was not able to generate any other logs.

Looked at this:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure

Could use this:
producerConfig.put("RequestTimeout", "10000");//from 6000

But doesn't really solve the problem when trying to maintain a real time processing system.

TIA


Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisProducer blocking ?

Tzu-Li (Gordon) Tai
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisProducer blocking ?

Vijay Balakrishnan
Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647. So, it should just be dropping records being produced from the 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I do not want backpressure as you said it effectively blocks all upstream operators.

But from what you are saying, it will apply backpressure when the number of outstanding records accumulated exceeds the default queue limit of 2147483647 or does it also do it if it is rate-limited to 1MB per second per shard by Kinesis ? The 2nd case of Rate Limiting by Kinesis seems more probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of 100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records that cannot be sent because of the rate restriction of 1 MB per second per shard are buffered in an unbounded queue and dropped when their RecordTtl expires.

To avoid data loss, you can enable backpressuring by restricting the size of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);

On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisProducer blocking ?

Vijay Balakrishnan
Hi Gordon,
ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32 nodes.
Could it be that the 80 threads get bottlenecked on a common ThreadPool of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run in separate slots/vCPUs and can be spread across 32 nodes in my case but occupying 80 slots/vCPUs. Is my understanding correct and will this be the reason that the KPL gets flooded with too many pending requests at regular intervals ??

TIA,

On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <[hidden email]> wrote:
Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647. So, it should just be dropping records being produced from the 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I do not want backpressure as you said it effectively blocks all upstream operators.

But from what you are saying, it will apply backpressure when the number of outstanding records accumulated exceeds the default queue limit of 2147483647 or does it also do it if it is rate-limited to 1MB per second per shard by Kinesis ? The 2nd case of Rate Limiting by Kinesis seems more probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of 100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records that cannot be sent because of the rate restriction of 1 MB per second per shard are buffered in an unbounded queue and dropped when their RecordTtl expires.

To avoid data loss, you can enable backpressuring by restricting the size of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);

On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisProducer blocking ?

Tzu-Li (Gordon) Tai
Hi Vijay,

ThreadPoolSize is for per Kinesis producer, which there is one for each parallel subtask.
If you are constantly hitting the 1MB per second per shard quota, then the records will be buffered by the FlinkKinesisProducer.
During this process, backpressure is not applied if you have not configured an upper bound for the buffer queue.

One other thing to note, which might explain the backpresses at regular intervals that you are experiencing,
is that the FlinkKinesisProducer needs to flush all pending records in the buffer before the checkpoint can complete for the sink.
That would also apply backpressure upstream.

Gordon

On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Gordon,
ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32 nodes.
Could it be that the 80 threads get bottlenecked on a common ThreadPool of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run in separate slots/vCPUs and can be spread across 32 nodes in my case but occupying 80 slots/vCPUs. Is my understanding correct and will this be the reason that the KPL gets flooded with too many pending requests at regular intervals ??

TIA,

On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <[hidden email]> wrote:
Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647. So, it should just be dropping records being produced from the 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I do not want backpressure as you said it effectively blocks all upstream operators.

But from what you are saying, it will apply backpressure when the number of outstanding records accumulated exceeds the default queue limit of 2147483647 or does it also do it if it is rate-limited to 1MB per second per shard by Kinesis ? The 2nd case of Rate Limiting by Kinesis seems more probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of 100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records that cannot be sent because of the rate restriction of 1 MB per second per shard are buffered in an unbounded queue and dropped when their RecordTtl expires.

To avoid data loss, you can enable backpressuring by restricting the size of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);

On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisProducer blocking ?

Vijay Balakrishnan
Thanks Gordon,
So, 10(ThreadPoolSize) * 80 sub-tasks = 800 threads goes to a Queue(unbounded by default). This then goes through KPL MaxConnections(24 by default) to KDS.

This suggests,  I need to decrease sub-tasks or setQueueLimit(800) and increase MaxConnections=256 (max allowed).
Checkpointing is not currently enabled.

Pls correct me if I am wrong.

On Tue, Jul 21, 2020 at 7:40 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Vijay,

ThreadPoolSize is for per Kinesis producer, which there is one for each parallel subtask.
If you are constantly hitting the 1MB per second per shard quota, then the records will be buffered by the FlinkKinesisProducer.
During this process, backpressure is not applied if you have not configured an upper bound for the buffer queue.

One other thing to note, which might explain the backpresses at regular intervals that you are experiencing,
is that the FlinkKinesisProducer needs to flush all pending records in the buffer before the checkpoint can complete for the sink.
That would also apply backpressure upstream.

Gordon

On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Gordon,
ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32 nodes.
Could it be that the 80 threads get bottlenecked on a common ThreadPool of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run in separate slots/vCPUs and can be spread across 32 nodes in my case but occupying 80 slots/vCPUs. Is my understanding correct and will this be the reason that the KPL gets flooded with too many pending requests at regular intervals ??

TIA,

On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <[hidden email]> wrote:
Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647. So, it should just be dropping records being produced from the 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I do not want backpressure as you said it effectively blocks all upstream operators.

But from what you are saying, it will apply backpressure when the number of outstanding records accumulated exceeds the default queue limit of 2147483647 or does it also do it if it is rate-limited to 1MB per second per shard by Kinesis ? The 2nd case of Rate Limiting by Kinesis seems more probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of 100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records that cannot be sent because of the rate restriction of 1 MB per second per shard are buffered in an unbounded queue and dropped when their RecordTtl expires.

To avoid data loss, you can enable backpressuring by restricting the size of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);

On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/