Kinesis connector SHARD_GETRECORDS_MAX default value

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Kinesis connector SHARD_GETRECORDS_MAX default value

Steffen Hausmann
Hi there,

I recently ran into problems with a Flink job running on an EMR cluster
consuming events from a Kinesis stream receiving roughly 15k
event/second. Although the EMR cluster was substantially scaled and CPU
utilization and system load were well below any alarming threshold, the
processing of events of the stream increasingly fell behind.

Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100
which is apparently causing too much overhead when consuming events from
the stream. Increasing the value to 5000, a single GetRecords call to
Kinesis can retrieve up to 10k records, made the problem go away.

I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low
(100x less than it could be). The Kinesis Client Library defaults to
5000 and it's recommended to use this default value:
http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.

Thanks for the clarification!

Cheers,
Steffen
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Tzu-Li (Gordon) Tai
Hi Steffan,

I have to admit that I didn’t put too much thoughts in the default values for the Kinesis consumer.

I’d say it would be reasonable to change the default values to follow KCL’s settings. Could you file a JIRA for this?

In general, we might want to reconsider all the default values for configs related to the getRecords call, i.e.
- SHARD_GETRECORDS_MAX
- SHARD_GETRECORDS_INTERVAL_MILLIS
- SHARD_GETRECORDS_BACKOFF_*

Cheers,
Gordon

On March 23, 2017 at 2:12:32 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I recently ran into problems with a Flink job running on an EMR cluster
consuming events from a Kinesis stream receiving roughly 15k
event/second. Although the EMR cluster was substantially scaled and CPU
utilization and system load were well below any alarming threshold, the
processing of events of the stream increasingly fell behind.

Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100
which is apparently causing too much overhead when consuming events from
the stream. Increasing the value to 5000, a single GetRecords call to
Kinesis can retrieve up to 10k records, made the problem go away.

I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low
(100x less than it could be). The Kinesis Client Library defaults to
5000 and it's recommended to use this default value:
http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.

Thanks for the clarification!

Cheers,
Steffen
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Steffen Hausmann
Hi Gordon,

thanks for looking into this and sorry it took me so long to file the
issue: https://issues.apache.org/jira/browse/FLINK-6365.

Really appreciate your contributions for the Kinesis connector!

Cheers,
Steffen

On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:

> Hi Steffan,
>
> I have to admit that I didn’t put too much thoughts in the default
> values for the Kinesis consumer.
>
> I’d say it would be reasonable to change the default values to follow
> KCL’s settings. Could you file a JIRA for this?
>
> In general, we might want to reconsider all the default values for
> configs related to the getRecords call, i.e.
> - SHARD_GETRECORDS_MAX
> - SHARD_GETRECORDS_INTERVAL_MILLIS
> - SHARD_GETRECORDS_BACKOFF_*
>
> Cheers,
> Gordon
>
> On March 23, 2017 at 2:12:32 AM, Steffen Hausmann
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> Hi there,
>>
>> I recently ran into problems with a Flink job running on an EMR cluster
>> consuming events from a Kinesis stream receiving roughly 15k
>> event/second. Although the EMR cluster was substantially scaled and CPU
>> utilization and system load were well below any alarming threshold, the
>> processing of events of the stream increasingly fell behind.
>>
>> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100
>> which is apparently causing too much overhead when consuming events from
>> the stream. Increasing the value to 5000, a single GetRecords call to
>> Kinesis can retrieve up to 10k records, made the problem go away.
>>
>> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low
>> (100x less than it could be). The Kinesis Client Library defaults to
>> 5000 and it's recommended to use this default value:
>> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
>>
>>
>> Thanks for the clarification!
>>
>> Cheers,
>> Steffen
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Tzu-Li (Gordon) Tai
Thanks for filing the JIRA!

Would you also be up to open a PR to for the change? That would be very very helpful :)

Cheers,
Gordon

On 24 April 2017 at 3:27:48 AM, Steffen Hausmann ([hidden email]) wrote:

Hi Gordon,

thanks for looking into this and sorry it took me so long to file the
issue: https://issues.apache.org/jira/browse/FLINK-6365.

Really appreciate your contributions for the Kinesis connector!

Cheers,
Steffen

On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:

> Hi Steffan,
>
> I have to admit that I didn’t put too much thoughts in the default
> values for the Kinesis consumer.
>
> I’d say it would be reasonable to change the default values to follow
> KCL’s settings. Could you file a JIRA for this?
>
> In general, we might want to reconsider all the default values for
> configs related to the getRecords call, i.e.
> - SHARD_GETRECORDS_MAX
> - SHARD_GETRECORDS_INTERVAL_MILLIS
> - SHARD_GETRECORDS_BACKOFF_*
>
> Cheers,
> Gordon
>
> On March 23, 2017 at 2:12:32 AM, Steffen Hausmann
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> Hi there,
>>
>> I recently ran into problems with a Flink job running on an EMR cluster
>> consuming events from a Kinesis stream receiving roughly 15k
>> event/second. Although the EMR cluster was substantially scaled and CPU
>> utilization and system load were well below any alarming threshold, the
>> processing of events of the stream increasingly fell behind.
>>
>> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100
>> which is apparently causing too much overhead when consuming events from
>> the stream. Increasing the value to 5000, a single GetRecords call to
>> Kinesis can retrieve up to 10k records, made the problem go away.
>>
>> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low
>> (100x less than it could be). The Kinesis Client Library defaults to
>> 5000 and it's recommended to use this default value:
>> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
>>
>>
>> Thanks for the clarification!
>>
>> Cheers,
>> Steffen
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Tzu-Li (Gordon) Tai
Hi Steffen,

Thanks for bringing up the discussion!

I think the reason why SHARD_GETRECORDS_INTERVAL_MILLIS was defaulted to 0 in the first place was because we didn’t want false impressions that the there was some latency introduced in Flink with the default settings.
To this end, I’m leaning towards not touching SHARD_GETRECORDS_INTERVAL_MILLIS.
Ideally, the docs in this section [1] should guide the user to tweak this setting if they’re having issues with competing apps also consuming the shards. Could also improve this if you think the notice for this issue needs to be more prominent.

However, I do suggest changing SHARD_GETRECORDS_MAX to a higher value. 100 seems to be too small by default.
Increasing that should also be safe in the sense that it would not introduce unexpected behavior changes for the user.
What do you think?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html#internally-used-kinesis-apis

On 23 June 2017 at 4:30:10 AM, Steffen Hausmann ([hidden email]) wrote:

Hi Gordon,

Regarding the value for SHARD_GETRECORDS_INTERVAL_MILLIS.

The best practice would be to set this value to 1000, as this settings
allows other applications to read from the same Kinesis stream. However,
this may considerably increase the latency of the respective Flink
application as events are only consumed every second.

Alternatively, the default value be as low as 200 (you can currently
only read from a shard 5 times a second, hence values lower than 200 are
undesirable), which reduces the latency of a single Flink application,
but causes undesirable effects when multiple applications consume events
from the same Kinesis stream.

I'd prefer setting the default to 1000, but I wanted to get your opinion
on this before I submit the PR.

Cheers,
Steffen

On 24/04/2017 00:39, Tzu-Li (Gordon) Tai wrote:

> Thanks for filing the JIRA!
>
> Would you also be up to open a PR to for the change? That would be very
> very helpful :)
>
> Cheers,
> Gordon
>
> On 24 April 2017 at 3:27:48 AM, Steffen Hausmann
> ([hidden email] <mailto:[hidden email]>) wrote:
>
>> Hi Gordon,
>>
>> thanks for looking into this and sorry it took me so long to file the
>> issue: https://issues.apache.org/jira/browse/FLINK-6365.
>>
>> Really appreciate your contributions for the Kinesis connector!
>>
>> Cheers,
>> Steffen
>>
>> On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:
>> > Hi Steffan,
>> >
>> > I have to admit that I didn’t put too much thoughts in the default
>> > values for the Kinesis consumer.
>> >
>> > I’d say it would be reasonable to change the default values to follow
>> > KCL’s settings. Could you file a JIRA for this?
>> >
>> > In general, we might want to reconsider all the default values for
>> > configs related to the getRecords call, i.e.
>> > - SHARD_GETRECORDS_MAX
>> > - SHARD_GETRECORDS_INTERVAL_MILLIS
>> > - SHARD_GETRECORDS_BACKOFF_*
>> >
>> > Cheers,
>> > Gordon
>> >
>> > On March 23, 2017 at 2:12:32 AM, Steffen Hausmann
>> > ([hidden email] <mailto:[hidden email]>) wrote:
>> >
>> >> Hi there,
>> >>
>> >> I recently ran into problems with a Flink job running on an EMR cluster
>> >> consuming events from a Kinesis stream receiving roughly 15k
>> >> event/second. Although the EMR cluster was substantially scaled and CPU
>> >> utilization and system load were well below any alarming threshold, the
>> >> processing of events of the stream increasingly fell behind.
>> >>
>> >> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100
>> >> which is apparently causing too much overhead when consuming events from
>> >> the stream. Increasing the value to 5000, a single GetRecords call to
>> >> Kinesis can retrieve up to 10k records, made the problem go away.
>> >>
>> >> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low
>> >> (100x less than it could be). The Kinesis Client Library defaults to
>> >> 5000 and it's recommended to use this default value:
>> >> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
>> >>
>> >>
>> >> Thanks for the clarification!
>> >>
>> >> Cheers,
>> >> Steffen