Can't get the FlinkKinesisProducer to work against Kinesalite for tests

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Can't get the FlinkKinesisProducer to work against Kinesalite for tests

Bruno Aranda
Hi,

We have started to use Kinesis with Flink and we need to be able to test when a Flink jobs writes to Kinesis. For that, we use a docker image with Kinesalite.

To configure the producer, we do like it is explained in the docs [1].

However, if we use this code, the job submission is going to fail, because the Flink Kinesis connector expect the configuration to have either the endpoint or the region, but not both, or none. (there is a typo in the error message as well where 'aws.region' is metioned twice) [2].

However, if we only specify the endpoint, then the KPL will fail complaining that there is no Region configured. It does look like Kinesis may not be trying to set up the endpoint? We are confused.

On the other hand, the Flink consumer works as expected and the endpoint pointing to Kinesalite works fine. The consumer follows a different path and creates the AWS client through a call to AWSUtil [3], which will take the endpoint into account.

Are we missing something? We have tried this in Flink versions from 1.3.2 to 1.6.1, building the kinesis connector against the latests KPLs.

Any help is appreciated,

Thanks!

Bruno


Reply | Threaded
Open this post in threaded view
|

Re: Can't get the FlinkKinesisProducer to work against Kinesalite for tests

Bruno Aranda
Hi again,

We managed at the end to get data into Kinesalite using the FlinkKinesisProducer, but to do so, we had to use different configuration, such as ignoring the 'aws.endpoint' setting and going for the ones that the Kinesis configuration will expect. So, to our FlinkKinesisProducer we pass configuration such as:
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put("KinesisEndpoint", "localhost")
producerConfig.put("KinesisPort", "4567")
producerConfig.put("VerifyCertificate", "false")
We had to make sure that Kinesalite itself was being started with the `--ssl` parameter, in order to use TLS and available thought HTTPS.

And, very importantly as well, our tests use Docker to run and we have find out just before throwing the towel that for this you can not use an Alpine-based image. If you want the Amazon KPL to work fine, it will need to be one of the Debian images running in Docker.

Hope this saves someone all the days we have spent looking at it :)

Cheers,

Bruno

On Wed, 26 Sep 2018 at 14:59 Bruno Aranda <[hidden email]> wrote:
Hi,

We have started to use Kinesis with Flink and we need to be able to test when a Flink jobs writes to Kinesis. For that, we use a docker image with Kinesalite.

To configure the producer, we do like it is explained in the docs [1].

However, if we use this code, the job submission is going to fail, because the Flink Kinesis connector expect the configuration to have either the endpoint or the region, but not both, or none. (there is a typo in the error message as well where 'aws.region' is metioned twice) [2].

However, if we only specify the endpoint, then the KPL will fail complaining that there is no Region configured. It does look like Kinesis may not be trying to set up the endpoint? We are confused.

On the other hand, the Flink consumer works as expected and the endpoint pointing to Kinesalite works fine. The consumer follows a different path and creates the AWS client through a call to AWSUtil [3], which will take the endpoint into account.

Are we missing something? We have tried this in Flink versions from 1.3.2 to 1.6.1, building the kinesis connector against the latests KPLs.

Any help is appreciated,

Thanks!

Bruno


Reply | Threaded
Open this post in threaded view
|

Re: Can't get the FlinkKinesisProducer to work against Kinesalite for tests

Fabian Hueske-2
Hi Bruno,

Thanks for sharing your approach!

Best, Fabian

Am Do., 27. Sep. 2018 um 18:11 Uhr schrieb Bruno Aranda <[hidden email]>:
Hi again,

We managed at the end to get data into Kinesalite using the FlinkKinesisProducer, but to do so, we had to use different configuration, such as ignoring the 'aws.endpoint' setting and going for the ones that the Kinesis configuration will expect. So, to our FlinkKinesisProducer we pass configuration such as:
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put("KinesisEndpoint", "localhost")
producerConfig.put("KinesisPort", "4567")
producerConfig.put("VerifyCertificate", "false")
We had to make sure that Kinesalite itself was being started with the `--ssl` parameter, in order to use TLS and available thought HTTPS.

And, very importantly as well, our tests use Docker to run and we have find out just before throwing the towel that for this you can not use an Alpine-based image. If you want the Amazon KPL to work fine, it will need to be one of the Debian images running in Docker.

Hope this saves someone all the days we have spent looking at it :)

Cheers,

Bruno

On Wed, 26 Sep 2018 at 14:59 Bruno Aranda <[hidden email]> wrote:
Hi,

We have started to use Kinesis with Flink and we need to be able to test when a Flink jobs writes to Kinesis. For that, we use a docker image with Kinesalite.

To configure the producer, we do like it is explained in the docs [1].

However, if we use this code, the job submission is going to fail, because the Flink Kinesis connector expect the configuration to have either the endpoint or the region, but not both, or none. (there is a typo in the error message as well where 'aws.region' is metioned twice) [2].

However, if we only specify the endpoint, then the KPL will fail complaining that there is no Region configured. It does look like Kinesis may not be trying to set up the endpoint? We are confused.

On the other hand, the Flink consumer works as expected and the endpoint pointing to Kinesalite works fine. The consumer follows a different path and creates the AWS client through a call to AWSUtil [3], which will take the endpoint into account.

Are we missing something? We have tried this in Flink versions from 1.3.2 to 1.6.1, building the kinesis connector against the latests KPLs.

Any help is appreciated,

Thanks!

Bruno