Stateful Functions, Kinesis, and ConsumerConfigConstants

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Functions, Kinesis, and ConsumerConfigConstants

Ammon Diether

When using Flink Stateful Function's KinesisIngressBuilder, I do not see a way to set things like ConsumerConfigConstants.SHARD_GETRECORDS_MAX or ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS

Looking at KinesisSourceProvider, it appears that this is the spot that creates the FlinkKinesisConsumer. The function named propertiesFromSpec(kinesisIngressSpec) only allows for AWS properties and a few startup position properties.  ConsumerConfigConstants.SHARD_GETRECORDS_MAX cannot be provided.

Is there an obvious workaround?

Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions, Kinesis, and ConsumerConfigConstants

Tzu-Li (Gordon) Tai
Hi Ammon,

Unfortunately you're right. I think the Flink Kinesis Consumer specific configs, e.g. keys in the ConsumerConfigConstants class, were overlooked in the initial design.

One way to workaround this is to use the `SourceFunctionSpec` [1]. Using that spec, you can use any Flink SourceFunction (e.g. a FlinkKinesisConsumer) as the ingress.
Simply instantiate a `SourceFunctionSpec` with the desired ID, and provide a custom FlinkKinesisConsumer that you create directly (which should allow you to provide the ConsumerConfigConstants).

As a side note, I've created this JIRA to address the issue you encountered, as I believe this should be supported in the native StateFun Kinesis ingress [2].

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/datastream/SourceFunctionSpec.java
[2] https://issues.apache.org/jira/browse/FLINK-22529

On Thu, Apr 29, 2021 at 7:25 AM Ammon Diether <[hidden email]> wrote:

When using Flink Stateful Function's KinesisIngressBuilder, I do not see a way to set things like ConsumerConfigConstants.SHARD_GETRECORDS_MAX or ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS

Looking at KinesisSourceProvider, it appears that this is the spot that creates the FlinkKinesisConsumer. The function named propertiesFromSpec(kinesisIngressSpec) only allows for AWS properties and a few startup position properties.  ConsumerConfigConstants.SHARD_GETRECORDS_MAX cannot be provided.

Is there an obvious workaround?

Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions, Kinesis, and ConsumerConfigConstants

Ammon Diether
That does allow me to setup the ConsumerConfigConstants.  
 - This does have one downside.  The SourceFunctionSpec has a different TYPE than KinesisFunctionSpec so the hashed operator ID does not match. Thus I had to allowNonRestoredState.

But it is worth it.  Thank you.



On Thu, Apr 29, 2021 at 4:44 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ammon,

Unfortunately you're right. I think the Flink Kinesis Consumer specific configs, e.g. keys in the ConsumerConfigConstants class, were overlooked in the initial design.

One way to workaround this is to use the `SourceFunctionSpec` [1]. Using that spec, you can use any Flink SourceFunction (e.g. a FlinkKinesisConsumer) as the ingress.
Simply instantiate a `SourceFunctionSpec` with the desired ID, and provide a custom FlinkKinesisConsumer that you create directly (which should allow you to provide the ConsumerConfigConstants).

As a side note, I've created this JIRA to address the issue you encountered, as I believe this should be supported in the native StateFun Kinesis ingress [2].

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/datastream/SourceFunctionSpec.java
[2] https://issues.apache.org/jira/browse/FLINK-22529

On Thu, Apr 29, 2021 at 7:25 AM Ammon Diether <[hidden email]> wrote:

When using Flink Stateful Function's KinesisIngressBuilder, I do not see a way to set things like ConsumerConfigConstants.SHARD_GETRECORDS_MAX or ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS

Looking at KinesisSourceProvider, it appears that this is the spot that creates the FlinkKinesisConsumer. The function named propertiesFromSpec(kinesisIngressSpec) only allows for AWS properties and a few startup position properties.  ConsumerConfigConstants.SHARD_GETRECORDS_MAX cannot be provided.

Is there an obvious workaround?