Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

Vijay Balakrishnan
Hi,
I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS & SHARD_DISCOVERY_INTERVAL_MILLIS.

My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often records are fetched from Kinesis Data Stream(KDS). Code seems to be doing this in ShardConsumer.run()-->getRecords()

SHARD_DISCOVERY_INTERVAL_MILLIS defines how often the KinesisConsmer checks if there are any changes to shards. We don't change shards during our Application run.I have changed it to a very high value to avoid this check as I was running into ListShards issues with LimitExceedeException when using 282 shards
Would this be a correct understanding of these 2 constants -especially the SHARD_DISCOVERY_INTERVAL_MILLIS

My assumption that needs to be validated:
The SHARD_DISCOVERY_INTERVAL_MILLIS should not affect the fetching of records as defined by SHARD_GETRECORDS_INTERVAL_MILLIS.

Code below:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, getRecsIntervalMs);//2000
       
/*
We do not change shards while the app is running.
So, we can increase SHARD_DISCOVERY_INTERVAL_MILLIS to a very high value to avoid any rateLimiting issues from the AWS API with the ListShards call.
Default is 10s. We can increase this to avoid this LimitExceededException as we don't change shards in the middle.
 */
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, shardDiscoveryInterval);//18000000 ms


TIA,
Reply | Threaded
Open this post in threaded view
|

Re: Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

Tzu-Li (Gordon) Tai
Hi Vijay,

Your assumption is correct that the discovery interval does not affect the interval of fetching records.

As a side note, you can actually disable shard discovery, by setting the value to -1.
The FlinkKinesisProducer would then only call ListShards once at job startup.

Cheers,
Gordon

On Fri, Jul 10, 2020 at 2:35 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS & SHARD_DISCOVERY_INTERVAL_MILLIS.

My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often records are fetched from Kinesis Data Stream(KDS). Code seems to be doing this in ShardConsumer.run()-->getRecords()

SHARD_DISCOVERY_INTERVAL_MILLIS defines how often the KinesisConsmer checks if there are any changes to shards. We don't change shards during our Application run.I have changed it to a very high value to avoid this check as I was running into ListShards issues with LimitExceedeException when using 282 shards
Would this be a correct understanding of these 2 constants -especially the SHARD_DISCOVERY_INTERVAL_MILLIS

My assumption that needs to be validated:
The SHARD_DISCOVERY_INTERVAL_MILLIS should not affect the fetching of records as defined by SHARD_GETRECORDS_INTERVAL_MILLIS.

Code below:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, getRecsIntervalMs);//2000
       
/*
We do not change shards while the app is running.
So, we can increase SHARD_DISCOVERY_INTERVAL_MILLIS to a very high value to avoid any rateLimiting issues from the AWS API with the ListShards call.
Default is 10s. We can increase this to avoid this LimitExceededException as we don't change shards in the middle.
 */
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, shardDiscoveryInterval);//18000000 ms


TIA,