Connecting to kinesis with mfa

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Connecting to kinesis with mfa

Avi Levi-2
Hi guys,
we are struggling to connect to kinesis when mfa is activated. I did configured everything according to the documentation but still getting exception :


val producerConfig = new Properties() producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion) producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey) producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey) producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)

with a very simple pipeline :

 

val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work perfectly when I execute commands from the cli. also when removing the mfa from kinesis the pipeline works as expected. finally i did open a ticket for that also .

Reply | Threaded
Open this post in threaded view
|

Re: Connecting to kinesis with mfa

rmetzger0
Hey Avi,

Maybe providing secret/access key + session token doesn't work, and you need to provide either one of them?

I'll also ping some AWS contributors active in Flink to take a look at this.

Best,
Robert

On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <[hidden email]> wrote:
Hi guys,
we are struggling to connect to kinesis when mfa is activated. I did configured everything according to the documentation but still getting exception :


val producerConfig = new Properties() producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion) producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey) producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey) producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)

with a very simple pipeline :

 

val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work perfectly when I execute commands from the cli. also when removing the mfa from kinesis the pipeline works as expected. finally i did open a ticket for that also .

Reply | Threaded
Open this post in threaded view
|

Re: Connecting to kinesis with mfa

Avi Levi-2
Thanks Robert, I actually tried all of the above but got to the same unfortunate result 

On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger <[hidden email]> wrote:
Hey Avi,

Maybe providing secret/access key + session token doesn't work, and you need to provide either one of them?

I'll also ping some AWS contributors active in Flink to take a look at this.

Best,
Robert

On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <[hidden email]> wrote:
Hi guys,
we are struggling to connect to kinesis when mfa is activated. I did configured everything according to the documentation but still getting exception :


val producerConfig = new Properties() producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion) producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey) producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey) producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)

with a very simple pipeline :

 

val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work perfectly when I execute commands from the cli. also when removing the mfa from kinesis the pipeline works as expected. finally i did open a ticket for that also .

Reply | Threaded
Open this post in threaded view
|

Re: Connecting to kinesis with mfa

Cranmer, Danny

Hey Avi,

 

I have reproduced and found a solution. The issue is not MFA, it is the BASIC credential provider is not using the token:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181

 

If you want to supply AK/SK/Token then you will have to use another CredentialProviderType, below is an example using SYS_PROP. We could improve the Kinesis connector to detect the session token and construct a BasicSessionCredentials:

https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java

 

Properties systemProperties = System.getProperties();
systemProperties.setProperty("aws.accessKeyId", accessKey);
systemProperties.setProperty("aws.secretKey", secretKey);
systemProperties.setProperty("aws.sessionToken", seesionToken);

Properties producerConfig = new Properties();
producerConfig.setProperty(AWSConfigConstants.AWS_REGION, REGION);
producerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "SYS_PROP");

 

I will add this to the Jira also. Let me know if you have any issues.

 

Thanks,

Danny

 

From: Avi Levi <[hidden email]>
Date: Wednesday, 16 December 2020 at 08:09
To: Robert Metzger <[hidden email]>
Cc: user <[hidden email]>
Subject: RE: [EXTERNAL] Connecting to kinesis with mfa

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

Thanks Robert, I actually tried all of the above but got to the same unfortunate result 

 

On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger <[hidden email]> wrote:

Hey Avi,

 

Maybe providing secret/access key + session token doesn't work, and you need to provide either one of them?

 

I'll also ping some AWS contributors active in Flink to take a look at this.

 

Best,

Robert

 

On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <[hidden email]> wrote:

Hi guys,

we are struggling to connect to kinesis when mfa is activated. I did configured everything according to the documentation but still getting exception :


val producerConfig =
new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)

with a very simple pipeline :

 

val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work perfectly when I execute commands from the cli. also when removing the mfa from kinesis the pipeline works as expected. finally i did open a ticket for that also .

Reply | Threaded
Open this post in threaded view
|

Re: Connecting to kinesis with mfa

Avi Levi-2
Awesome, thanks! looks good 


On Wed, Dec 16, 2020 at 12:55 PM Cranmer, Danny <[hidden email]> wrote:

Hey Avi,

 

I have reproduced and found a solution. The issue is not MFA, it is the BASIC credential provider is not using the token:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181

 

If you want to supply AK/SK/Token then you will have to use another CredentialProviderType, below is an example using SYS_PROP. We could improve the Kinesis connector to detect the session token and construct a BasicSessionCredentials:

https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java

 

Properties systemProperties = System.getProperties();
systemProperties.setProperty("aws.accessKeyId", accessKey);
systemProperties.setProperty("aws.secretKey", secretKey);
systemProperties.setProperty("aws.sessionToken", seesionToken);

Properties producerConfig = new Properties();
producerConfig.setProperty(AWSConfigConstants.AWS_REGION, REGION);
producerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "SYS_PROP");

 

I will add this to the Jira also. Let me know if you have any issues.

 

Thanks,

Danny

 

From: Avi Levi <[hidden email]>
Date: Wednesday, 16 December 2020 at 08:09
To: Robert Metzger <[hidden email]>
Cc: user <[hidden email]>
Subject: RE: [EXTERNAL] Connecting to kinesis with mfa

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

Thanks Robert, I actually tried all of the above but got to the same unfortunate result 

 

On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger <[hidden email]> wrote:

Hey Avi,

 

Maybe providing secret/access key + session token doesn't work, and you need to provide either one of them?

 

I'll also ping some AWS contributors active in Flink to take a look at this.

 

Best,

Robert

 

On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <[hidden email]> wrote:

Hi guys,

we are struggling to connect to kinesis when mfa is activated. I did configured everything according to the documentation but still getting exception :


val producerConfig =
new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, awsSessionToken)

with a very simple pipeline :

 

val producer = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'UnrecognizedClientException': The security token included in the request is invalid.
15:30:44,396 ERROR org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work perfectly when I execute commands from the cli. also when removing the mfa from kinesis the pipeline works as expected. finally i did open a ticket for that also .