Hi, current setup. Kinesis stream 1 -----> Kinesis Analytics Flink -----> Kinesis stream 2 | ----> Firehose Delivery stream Curl eror: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-07-02 15:22:32.203053] [0x000007f4][0x00007ffbced15700] [error] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 28 But I am still seeing tons of the curl 28 error. I use parallelism of 80 for the Sink to Kinesis Data stream(KDS). Which seems to point to KDS being pounded with too many requests - the 80(parallelism) * 10(ThreadPool size) = 800 requests. Is my understanding correct ? So, maybe reduce the 80 parallelism ?? I still don't understand why the logs are stuck with just FlinkKInesisProducer for around 4s(blocking calls???) with the rest of the Flink Analytics application not producing any logs while this happens. I noticed that the FlinkKInesisProducer took about 3.785secs, 3.984s, 4.223s in between other application logs in Kibana when the Kinesis GetIterator Age peaked. It seemed like FlinkKinesisProducer was blocking for that long when the Flink app was not able to generate any other logs. Looked at this: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure Could use this: producerConfig.put("RequestTimeout", "10000");//from 6000 But doesn't really solve the problem when trying to maintain a real time processing system. TIA |
Hi Vijay,
The FlinkKinesisProducer does not use blocking calls to the AWS KDS API. It does however apply backpressure (therefore effectively blocking all upstream operators) when the number of outstanding records accumulated exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit method. For starters, you can maybe check if that was set appropriately. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks,Gordon for your reply. I do not set a queueLimit and so the default unbounded queueSize is 2147483647. So, it should just be dropping records being produced from the 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I do not want backpressure as you said it effectively blocks all upstream operators. But from what you are saying, it will apply backpressure when the number of outstanding records accumulated exceeds the default queue limit of 2147483647 or does it also do it if it is rate-limited to 1MB per second per shard by Kinesis ? The 2nd case of Rate Limiting by Kinesis seems more probable. So, calculating Queue Limit: Based on this, my records size = 1600 bytes. I have 96 shards Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of 100kB per shard should be sufficient.So, Queue size/shard=100KB Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000 Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25 Acc. to the docs: By default, To avoid data loss, you can enable backpressuring by restricting the size of the internal queue:
On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi Vijay, |
Hi Gordon, ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32 nodes. Could it be that the 80
threads get bottlenecked on a common ThreadPool of 10 or is it spawning 80 * 10 threads in total. The Flink
TaskManagers run in separate slots/vCPUs and can be spread across 32
nodes in my case but occupying 80 slots/vCPUs. Is my understanding correct and will this be the reason that the KPL gets flooded with too many pending requests at regular intervals ?? TIA, On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi Vijay, ThreadPoolSize is for per Kinesis producer, which there is one for each parallel subtask. If you are constantly hitting the 1MB per second per shard quota, then the records will be buffered by the FlinkKinesisProducer. During this process, backpressure is not applied if you have not configured an upper bound for the buffer queue. One other thing to note, which might explain the backpresses at regular intervals that you are experiencing, is that the FlinkKinesisProducer needs to flush all pending records in the buffer before the checkpoint can complete for the sink. That would also apply backpressure upstream. Gordon On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Thanks Gordon, So, 10(ThreadPoolSize) * 80 sub-tasks = 800 threads goes to a Queue(unbounded by default). This then goes through KPL MaxConnections(24 by default) to KDS. This suggests, I need to decrease sub-tasks or setQueueLimit(800) and increase MaxConnections=256 (max allowed). Checkpointing is not currently enabled. Pls correct me if I am wrong. On Tue, Jul 21, 2020 at 7:40 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |