Example flink run with security options? Running on k8s in my case

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Example flink run with security options? Running on k8s in my case

Adam Roberts
Hey everyone, I've been experimenting with Flink using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator and I believe I've successfully deployed a JobManager and TaskManager with security enabled, and a self-signed certificate (the pods come up great).
 
However, I can't do much with this - I can't port-forward and access the UI, nor can I submit jobs to it by running another pod and using the DNS name lookup of the service.
 
I always get
 
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
 
...
 
Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel became inactive.
    ... 37 more
 
 
and this is even with all of the -D security options provided.
 
The versions of Flink are the same for both my Job and my FlinkCluster (1.11.1).
 
Is this a sensible thing to do? If I weren't using the operator for example, would users be expected to flink run with all of these options?
 
Does anything look odd here? My guess is because security's on, the Job Manager refuses to talk to my submitter.
 
Running as the flink user in the container, I do
 

      securityContext:

        runAsUser: 9999

        runAsGroup: 9999

      containers:

      - name: wordcount

        image: adamroberts/mycoolflink:latest

        args:

        - /opt/flink/bin/flink

        - run

        - -D

        - security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key

        - -D 

        - security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks

        - -D 

        - security.ssl.rest.keystore-password=thepass # Replace with value of flink-tls-keystore.password

        - -D 

        - security.ssl.rest.key-password=thepass # Replace with value of tls.p12.password

        - -D 

        - security.ssl.rest.truststore-password=thepass # Replace with value of flink-tls-ca.truststore.password

        - -D 

        - security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key

        - -D 

        - security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks

        - -D 

        - security.ssl.internal.keystore-password=thepass # Replace with value of flink-tls-keystore.password

        - -D 

        - security.ssl.internal.key-password=thepass # Replace with value of flink-tls-keystore.password

        - -D 

        - security.ssl.internal.truststore-password=thepass # Replace with value of flink-tls-truststore.password

        - -m

        - tls-flink-cluster-1-11-jobmanager:8081

        - /opt/flink/examples/batch/WordCount.jar 

        - --input 

        - /opt/flink/NOTICE

 
with the secrets mounted in at the above location (if I exec into my container, I can see they're all there OK). Note that it is a read-only file system.
 
adamroberts/mycoolflink (at this time of this email) is just based on https://github.com/apache/flink-docker.
 
Thanks!
 
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Reply | Threaded
Open this post in threaded view
|

Re: Example flink run with security options? Running on k8s in my case

Nico Kruber-2
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of
its directory. If that is the same as in the cluster, you wouldn't have to
pass most of your parameters manually. However, if you prefer not having a
flink-conf.yaml in place, you could remove the security.ssl.internal.*
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without
security enabled first and then enable it (just to rule out any other issues)

From the commands you mentioned, it looks like you're just missing
security.ssl.rest.enabled=true and because of that, the client would not use
SSL for the connection.

For more information and setup, I recommend reading through [1] which also
contains an example at the bottom of the page and how to use curl to test or
use the REST endpoint.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:

> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great).
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service.
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1).
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options?
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter.
> Running as the flink user in the container, I do
>  
>
>       securityContext:
>
>         runAsUser: 9999
>
>         runAsGroup: 9999
>
>       containers:
>
>       - name: wordcount
>
>         image: adamroberts/mycoolflink:latest
>
>         args:
>
>         - /opt/flink/bin/flink
>
>         - run
>
>         - -D
>
>         -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
>
>         - -D
>
>         - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
>
>         - -D
>
>         - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
>
>         - -D
>
>         -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
>
>         - -D
>
>         - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
>
>         - -m
>
>         - tls-flink-cluster-1-11-jobmanager:8081
>
>         - /opt/flink/examples/batch/WordCount.jar
>
>         - --input
>
>         - /opt/flink/NOTICE
>
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system.
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker.
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU



Reply | Threaded
Open this post in threaded view
|

RE: Example flink run with security options? Running on k8s in my case

Adam Roberts
Hey Nico - thanks for the prompt response, good catch - I've just tried with the two security options (enabling rest and internal SSL communications) and still hit the same problem
 
I've also tried turning off security (both in my Job definition and in my Flink cluster JobManager/TaskManager settings) and the communication does happen successfully, suggesting all is well otherwise.
 
With regards to testing with just a regular curl, I switched security back on and did the curl, using this:
 

openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081

 
from the Job CR pod, which is who runs the flink run against my JobManager i'd like to connect to.
 
That gives 
 

$ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1 encoding routines:ASN1_get_object:header too long:../crypto/asn1/asn1_lib.c:101:

so I wonder if my security set up itself is flawed...I'll be happy to share the scripting I have to do that if folks feel it'll be of use, thanks again
 
----- Original message -----
From: Nico Kruber <[hidden email]>
To: [hidden email]
Cc: Adam Roberts <[hidden email]>
Subject: [EXTERNAL] Re: Example flink run with security options? Running on k8s in my case
Date: Wed, Aug 26, 2020 11:40 AM
 
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of
its directory. If that is the same as in the cluster, you wouldn't have to
pass most of your parameters manually. However, if you prefer not having a
flink-conf.yaml in place, you could remove the security.ssl.internal.*
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without
security enabled first and then enable it (just to rule out any other issues)

From the commands you mentioned, it looks like you're just missing
security.ssl.rest.enabled=true and because of that, the client would not use
SSL for the connection.

For more information and setup, I recommend reading through [1] which also
contains an example at the bottom of the page and how to use curl to test or
use the REST endpoint.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html 

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:

> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great).
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service.
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1).
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options?
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter.
> Running as the flink user in the container, I do
>  
>
>       securityContext:
>
>         runAsUser: 9999
>
>         runAsGroup: 9999
>
>       containers:
>
>       - name: wordcount
>
>         image: adamroberts/mycoolflink:latest
>
>         args:
>
>         - /opt/flink/bin/flink
>
>         - run
>
>         - -D
>
>         -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
>
>         - -D
>
>         - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
>
>         - -D
>
>         - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
>
>         - -D
>
>         -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
>
>         - -D
>
>         - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
>
>         - -m
>
>         - tls-flink-cluster-1-11-jobmanager:8081
>
>         - /opt/flink/examples/batch/WordCount.jar
>
>         - --input
>
>         - /opt/flink/NOTICE
>
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system.
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker .
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU


 
 
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Reply | Threaded
Open this post in threaded view
|

Re: Example flink run with security options? Running on k8s in my case

Andrey Zagrebin-5
Hi Adam,

maybe also check your SSL setup in a local cluster to exclude possibly related k8s things.

Best,
Andrey

On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts <[hidden email]> wrote:
Hey Nico - thanks for the prompt response, good catch - I've just tried with the two security options (enabling rest and internal SSL communications) and still hit the same problem
 
I've also tried turning off security (both in my Job definition and in my Flink cluster JobManager/TaskManager settings) and the communication does happen successfully, suggesting all is well otherwise.
 
With regards to testing with just a regular curl, I switched security back on and did the curl, using this:
 

openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081

 
from the Job CR pod, which is who runs the flink run against my JobManager i'd like to connect to.
 
That gives 
 

$ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1 encoding routines:ASN1_get_object:header too long:../crypto/asn1/asn1_lib.c:101:

so I wonder if my security set up itself is flawed...I'll be happy to share the scripting I have to do that if folks feel it'll be of use, thanks again
 
----- Original message -----
From: Nico Kruber <[hidden email]>
To: [hidden email]
Cc: Adam Roberts <[hidden email]>
Subject: [EXTERNAL] Re: Example flink run with security options? Running on k8s in my case
Date: Wed, Aug 26, 2020 11:40 AM
 
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of
its directory. If that is the same as in the cluster, you wouldn't have to
pass most of your parameters manually. However, if you prefer not having a
flink-conf.yaml in place, you could remove the security.ssl.internal.*
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without
security enabled first and then enable it (just to rule out any other issues)

From the commands you mentioned, it looks like you're just missing
security.ssl.rest.enabled=true and because of that, the client would not use
SSL for the connection.

For more information and setup, I recommend reading through [1] which also
contains an example at the bottom of the page and how to use curl to test or
use the REST endpoint.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html 

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:

> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great).
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service.
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1).
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options?
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter.
> Running as the flink user in the container, I do
>  
>
>       securityContext:
>
>         runAsUser: 9999
>
>         runAsGroup: 9999
>
>       containers:
>
>       - name: wordcount
>
>         image: adamroberts/mycoolflink:latest
>
>         args:
>
>         - /opt/flink/bin/flink
>
>         - run
>
>         - -D
>
>         -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
>
>         - -D
>
>         - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
>
>         - -D
>
>         - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
>
>         - -D
>
>         -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
>
>         - -D
>
>         - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
>
>         - -m
>
>         - tls-flink-cluster-1-11-jobmanager:8081
>
>         - /opt/flink/examples/batch/WordCount.jar
>
>         - --input
>
>         - /opt/flink/NOTICE
>
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system.
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker .
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU


 
 
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Reply | Threaded
Open this post in threaded view
|

RE: Example flink run with security options? Running on k8s in my case

Adam Roberts
Hey folks, outside of Kubernetes things are great yep, with the same generated files.
 
So to share what I'm doing a little more... and I've modified things to be more inline with the current docs
 
keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname "CN=flink.internal" -storepass internal_store_password -keyalg RSA -keysize 4096 -storetype PKCS12
keytool -genkeypair -alias flink.rest -keystore rest.keystore -dname "CN=myhost.company.org" -ext "SAN=dns:myhost.company.org,ip:127.0.0.1" -storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12
keytool -exportcert -keystore rest.keystore -alias flink.rest -storepass rest_keystore_password -file flink.cer
keytool -importcert -keystore rest.truststore -alias flink.rest -storepass rest_truststore_password -file flink.cer -noprompt
kubectl delete secret flink-tls-secret-2
# Create the simpler secret from main docs for Flink
cat << EOF | kubectl create -n abp -f -
  apiVersion: v1
  kind: Secret
  type: Opaque
  metadata:
    name: flink-tls-secret-2
  data:
    rest.keystore: $(cat ./rest.keystore | base64 | tr -d '\n')
    rest.truststore: $(cat ./rest.truststore | base64 | tr -d '\n')
    internal.keystore: $(cat ./internal.keystore | base64 | tr -d '\n')
    internal.truststore: $(cat ./internal.keystore | base64 | tr -d '\n')
EOF
 
I run this script to get flink-tls-secret-2 with those files in, the keytool commands should be familiar since they're from the Flink 1.11 security docs).
 
Note I don't have a file called internal.truststore but neither do the docs, they mention file.truststore but don't tell me how that's made...maybe this is the problem? But things are fine with my normal Flink outside of Kubernetes set up.
 
The Job CustomResource does:

apiVersion: batch/v1
kind: Job
metadata:
  name: sample-job
  labels:
    app: flink-job
spec:
  template:
    spec:
      # Run as flink user
      securityContext:
        runAsUser: 9999
        runAsGroup: 9999
      containers:
      - name: wordcount
        # Replace this to be a Docker image with your built Flink app at a known location
        # Your build of Flink should be based on https://github.com/apache/flink-docker/tree/master/1.11/scala_2.12-java8-debian
        # with a modification to the Dockerfile to add your jar in (with a COPY)
        image: adamroberts/mycoolflink:latest
        - /opt/flink/bin/flink
        - run
        - -D security.ssl.internal.enabled=true
        - -D security.ssl.rest.enabled=true
        - -D security.ssl.rest.keystore=/etc/flink-secrets/rest.keystore
        - -D security.ssl.rest.truststore=/etc/flink-secrets/rest.truststore
        - -D security.ssl.rest.keystore-password=rest_keystore_password
        - -D security.ssl.rest.key-password=rest_keystore_password
        - -D security.ssl.rest.truststore-password=rest_truststore_password
        - -D security.ssl.internal.keystore=/etc/flink-secrets/internal.keystore
        - -D security.ssl.internal.truststore=/etc/flink-secrets/internal.keystore
        - -D security.ssl.internal.keystore-password=internal_store_password
        - -D security.ssl.internal.key-password=internal_store_password
        - -D security.ssl.internal.truststore-password=internal_store_password
        - -m
        - tls-flink-cluster-1-11-jobmanager:8081
        - /opt/flink/examples/batch/WordCount.jar 
        - --input 
        - /opt/flink/NOTICE
        volumeMounts:
          - name: flink-secret-volume
            mountPath: /etc/flink-secrets
      volumes:
      - name: flink-secret-volume
        secret:
          secretName: flink-tls-secret-2
      restartPolicy: Never
 
If I modify that to be a simple curl image but keeping the secrets mounted in, I can kubectl exec in and curl the JobManager at  tls-flink-cluster-1-11-jobmanager:8081 - I get no response, but I get an error if I go to a different port or URL.
 
The secrets do look ok inside the container too.
 
The Cluster spec looks like this now
 
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: tls-flink-cluster-1-11
spec:
  jobManager:
    volumeMounts:
    - name: flink-secret-volume
      mountPath: /etc/flink-secrets
    volumes:
    - name: flink-secret-volume
      secret:
        secretName: flink-tls-secret-2
    resources:
      limits:
        memory: 600Mi
        cpu: "1.0"
  taskManager:
    volumeMounts:
      - name: flink-secret-volume
        mountPath: /etc/flink-secrets
    volumes:
    - name: flink-secret-volume
      secret:
        secretName: flink-tls-secret-2
    replicas: 1
    resources:
      limits:
        memory: 1Gi
        cpu: "1.0"
  image:
    name: adamroberts/mycoolflink:latest
  flinkProperties:
    # https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html is helpful for this part.
    web.submit.enable: "false"
    security.ssl.rest.enabled: "true"
    security.ssl.rest.keystore: "/etc/flink-secrets/rest.keystore"
    security.ssl.rest.truststore: "/etc/flink-secrets/rest.truststore"
    security.ssl.rest.keystore-password: "rest_keystore_password"
    security.ssl.rest.key-password: "rest_keystore_password"
    security.ssl.rest.truststore-password: "rest_truststore_password"
    security.ssl.internal.enabled: "true"
    security.ssl.internal.keystore: "/etc/flink-secrets/internal.keystore"
    security.ssl.internal.truststore: "/etc/flink-secrets/internal.keystore"
    security.ssl.internal.keystore-password: "internal_store_password"
    security.ssl.internal.key-password: "internal_store_password"
    security.ssl.internal.truststore-password: "internal_store_password"
    taskmanager.numberOfTaskSlots: "1"
    jobmanager.heap.size: ""                # set empty value (only for Flink version 1.11 or above)
    jobmanager.memory.process.size: 1gb   # job manager memory limit  (only for Flink version 1.11 or above)
    taskmanager.heap.size: ""               # set empty value
    taskmanager.memory.process.size: 1gb    # task manager memory limit
 
Cheers,
----- Original message -----
From: Andrey Zagrebin <[hidden email]>
To: Adam Roberts <[hidden email]>
Cc: [hidden email], user <[hidden email]>
Subject: [EXTERNAL] Re: Example flink run with security options? Running on k8s in my case
Date: Wed, Aug 26, 2020 5:35 PM
 
Hi Adam,

maybe also check your SSL setup in a local cluster to exclude possibly related k8s things.

Best,
Andrey
 
On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts <[hidden email]> wrote:
Hey Nico - thanks for the prompt response, good catch - I've just tried with the two security options (enabling rest and internal SSL communications) and still hit the same problem
 
I've also tried turning off security (both in my Job definition and in my Flink cluster JobManager/TaskManager settings) and the communication does happen successfully, suggesting all is well otherwise.
 
With regards to testing with just a regular curl, I switched security back on and did the curl, using this:
 

openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081

 
from the Job CR pod, which is who runs the flink run against my JobManager i'd like to connect to.
 
That gives 
 

$ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1 encoding routines:ASN1_get_object:header too long:../crypto/asn1/asn1_lib.c:101:

so I wonder if my security set up itself is flawed...I'll be happy to share the scripting I have to do that if folks feel it'll be of use, thanks again
 
----- Original message -----
From: Nico Kruber <[hidden email]>
To: [hidden email]
Cc: Adam Roberts <[hidden email]>
Subject: [EXTERNAL] Re: Example flink run with security options? Running on k8s in my case
Date: Wed, Aug 26, 2020 11:40 AM
 
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of
its directory. If that is the same as in the cluster, you wouldn't have to
pass most of your parameters manually. However, if you prefer not having a
flink-conf.yaml in place, you could remove the security.ssl.internal.*
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without
security enabled first and then enable it (just to rule out any other issues)

From the commands you mentioned, it looks like you're just missing
security.ssl.rest.enabled=true and because of that, the client would not use
SSL for the connection.

For more information and setup, I recommend reading through [1] which also
contains an example at the bottom of the page and how to use curl to test or
use the REST endpoint.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html 

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:

> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great).
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service.
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1).
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options?
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter.
> Running as the flink user in the container, I do
>  
>
>       securityContext:
>
>         runAsUser: 9999
>
>         runAsGroup: 9999
>
>       containers:
>
>       - name: wordcount
>
>         image: adamroberts/mycoolflink:latest
>
>         args:
>
>         - /opt/flink/bin/flink
>
>         - run
>
>         - -D
>
>         -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
>
>         - -D
>
>         - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
>
>         - -D
>
>         - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
>
>         - -D
>
>         -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
>
>         - -D
>
>         - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
>
>         - -m
>
>         - tls-flink-cluster-1-11-jobmanager:8081
>
>         - /opt/flink/examples/batch/WordCount.jar
>
>         - --input
>
>         - /opt/flink/NOTICE
>
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system.
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker .
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU


 
 
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
 
 
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Reply | Threaded
Open this post in threaded view
|

Re: Example flink run with security options? Running on k8s in my case

Nico Kruber-3
Actually, your curl command may be incorrect since you didn't specify https as
the protocol: Its man page says:
> If you specify URL without protocol:// prefix, curl will attempt to guess
> what protocol you might want. It will then default to HTTP but try other
> protocols based on often-used host name prefixes. For example, for host
> names starting with "ftp." curl will assume you want to speak FTP.

So I guess it wasn't actually using that and failed to connect. Unfortunately,
an empty response doesn't tell you much since it could have established a
connection which was then reset by the server. Please use one of Flink's REST
endpoints[1] to verify - these should have some content in the response.

It may also be useful to pair curl with `--verbose` for more output and also
look at the JM logs for any such problems.


I'm not sure how the GCP flink operator sets things up, but if submitting the
job is independent of starting the JM and TM pods, you don't need any of the
internal SSL configuration parameters for submitting a job. This is a per-
cluster setting!


As for the certificate generation: I'm not sure "myhost.company.org,ip:
127.0.0.1" would work here if the client is accessing the JM via the name
"tls-flink-cluster-1-11-jobmanager"...I'm not 100% sure here, but I would
assume there is verification on the actual URL that the certificate is
supposed to secure. What you were saying when creating it was that the URL is
either "myhost.company.org" or "127.0.0.1" which is not correct in the non-
local case.


Just one further note here: Because setting up SSL can be difficult, our
Ververica Platform (also on the free-to-use community edition) comes with a
SSL setup [2] that you can enable with a click of a button and it just works
as expected. Maybe also something to check out (not just for configuring SSL).
Feel free to contact me personally for more in this regard.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/
rest_api.html
[2] https://docs.ververica.com/user_guide/deployments/
configure_flink.html#ssl-tls-setup

On Thursday, 27 August 2020 13:36:45 CEST Adam Roberts wrote:

> Hey folks, outside of Kubernetes things are great yep, with the same
> generated files.
> So to share what I'm doing a little more... and I've modified things to be
> more inline with the current docs
> keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname
> "CN=flink.internal" -storepass internal_store_password -keyalg RSA -keysize
> 4096 -storetype PKCS12 keytool -genkeypair -alias flink.rest -keystore
> rest.keystore -dname "CN=myhost.company.org" -ext
> "SAN=dns:myhost.company.org,ip:127.0.0.1" -storepass rest_keystore_password
> -keyalg RSA -keysize 4096 -storetype PKCS12 keytool -exportcert -keystore
> rest.keystore -alias flink.rest -storepass rest_keystore_password -file
> flink.cer keytool -importcert -keystore rest.truststore -alias flink.rest
> -storepass rest_truststore_password -file flink.cer -noprompt kubectl
> delete secret flink-tls-secret-2
> # Create the simpler secret from main docs for Flink
> cat << EOF | kubectl create -n abp -f -
>   apiVersion: v1
>   kind: Secret
>   type: Opaque
>   metadata:
>     name: flink-tls-secret-2
>   data:
>     rest.keystore: $(cat ./rest.keystore | base64 | tr -d '\n')
>     rest.truststore: $(cat ./rest.truststore | base64 | tr -d '\n')
>     internal.keystore: $(cat ./internal.keystore | base64 | tr -d '\n')
>     internal.truststore: $(cat ./internal.keystore | base64 | tr -d '\n')
> EOF
>  
> I run this script to get flink-tls-secret-2 with those files in, the keytool
> commands should be familiar since they're from the Flink 1.11 security
> docs).
> Note I don't have a file called internal.truststore but neither do the docs,
> they mention file.truststore but don't tell me how that's made...maybe this
> is the problem? But things are fine with my normal Flink outside of
> Kubernetes set up.
> The Job CustomResource does:
>
> apiVersion: batch/v1
> kind: Job
> metadata:
>   name: sample-job
>   labels:
>     app: flink-job
> spec:
>   template:
>     spec:
>       # Run as flink user
>       securityContext:
>         runAsUser: 9999
>         runAsGroup: 9999
>       containers:
>       - name: wordcount
>         # Replace this to be a Docker image with your built Flink app at a
> known location # Your build of Flink should be based on
> https://github.com/apache/flink-docker/tree/master/1.11/scala_2.12-java8-de
> bian # with a modification to the Dockerfile to add your jar in (with a
> COPY) image: adamroberts/mycoolflink:latest
>         - /opt/flink/bin/flink
>         - run
>         - -D security.ssl.internal.enabled=true
>         - -D security.ssl.rest.enabled=true
>         - -D security.ssl.rest.keystore=/etc/flink-secrets/rest.keystore
>         - -D security.ssl.rest.truststore=/etc/flink-secrets/rest.truststore
> - -D security.ssl.rest.keystore-password=rest_keystore_password - -D
> security.ssl.rest.key-password=rest_keystore_password - -D
> security.ssl.rest.truststore-password=rest_truststore_password - -D
> security.ssl.internal.keystore=/etc/flink-secrets/internal.keystore - -D
> security.ssl.internal.truststore=/etc/flink-secrets/internal.keystore - -D
> security.ssl.internal.keystore-password=internal_store_password - -D
> security.ssl.internal.key-password=internal_store_password - -D
> security.ssl.internal.truststore-password=internal_store_password - -m
>         - tls-flink-cluster-1-11-jobmanager:8081
>         - /opt/flink/examples/batch/WordCount.jar
>         - --input
>         - /opt/flink/NOTICE
>         volumeMounts:
>           - name: flink-secret-volume
>             mountPath: /etc/flink-secrets
>       volumes:
>       - name: flink-secret-volume
>         secret:
>           secretName: flink-tls-secret-2
>       restartPolicy: Never
>  
> If I modify that to be a simple curl image but keeping the secrets mounted
> in, I can kubectl exec in and curl the JobManager at
> tls-flink-cluster-1-11-jobmanager:8081 - I get no response, but I get an
> error if I go to a different port or URL.
> The secrets do look ok inside the container too.
>  
> The Cluster spec looks like this now
>  
> apiVersion: flinkoperator.k8s.io/v1beta1
> kind: FlinkCluster
> metadata:
>   name: tls-flink-cluster-1-11
> spec:
>   jobManager:
>     volumeMounts:
>     - name: flink-secret-volume
>       mountPath: /etc/flink-secrets
>     volumes:
>     - name: flink-secret-volume
>       secret:
>         secretName: flink-tls-secret-2
>     resources:
>       limits:
>         memory: 600Mi
>         cpu: "1.0"
>   taskManager:
>     volumeMounts:
>       - name: flink-secret-volume
>         mountPath: /etc/flink-secrets
>     volumes:
>     - name: flink-secret-volume
>       secret:
>         secretName: flink-tls-secret-2
>     replicas: 1
>     resources:
>       limits:
>         memory: 1Gi
>         cpu: "1.0"
>   image:
>     name: adamroberts/mycoolflink:latest
>   flinkProperties:
>     #
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.htm
> l is helpful for this part. web.submit.enable: "false"
>     security.ssl.rest.enabled: "true"
>     security.ssl.rest.keystore: "/etc/flink-secrets/rest.keystore"
>     security.ssl.rest.truststore: "/etc/flink-secrets/rest.truststore"
>     security.ssl.rest.keystore-password: "rest_keystore_password"
>     security.ssl.rest.key-password: "rest_keystore_password"
>     security.ssl.rest.truststore-password: "rest_truststore_password"
>     security.ssl.internal.enabled: "true"
>     security.ssl.internal.keystore: "/etc/flink-secrets/internal.keystore"
>     security.ssl.internal.truststore: "/etc/flink-secrets/internal.keystore"
> security.ssl.internal.keystore-password: "internal_store_password"
> security.ssl.internal.key-password: "internal_store_password"
>     security.ssl.internal.truststore-password: "internal_store_password"
>     taskmanager.numberOfTaskSlots: "1"
>     jobmanager.heap.size: ""                # set empty value (only for
> Flink version 1.11 or above) jobmanager.memory.process.size: 1gb   # job
> manager memory limit  (only for Flink version 1.11 or above)
> taskmanager.heap.size: ""               # set empty value
>     taskmanager.memory.process.size: 1gb    # task manager memory limit
>  
> Cheers,
> ----- Original message -----
> From: Andrey Zagrebin <[hidden email]>
> To: Adam Roberts <[hidden email]>
> Cc: [hidden email], user <[hidden email]>
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
> k8s in my case Date: Wed, Aug 26, 2020 5:35 PM
>  
> Hi Adam,
>
> maybe also check your SSL setup in a local cluster to exclude possibly
> related k8s things.
>
> Best,
> Andrey
>  
> On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts <[hidden email]> wrote:
> Hey Nico - thanks for the prompt response, good catch - I've just tried with
> the two security options (enabling rest and internal SSL communications)
> and still hit the same problem
> I've also tried turning off security (both in my Job definition and in my
> Flink cluster JobManager/TaskManager settings) and the communication does
> happen successfully, suggesting all is well otherwise.
> With regards to testing with just a regular curl, I switched security back
> on and did the curl, using this:
>
> openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
>
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
>
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081
>
>  
> from the Job CR pod, which is who runs the flink run against my JobManager
> i'd like to connect to.
> That gives
>  
>
> $ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
>
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
>
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1
> encoding routines:ASN1_get_object:header too
> long:../crypto/asn1/asn1_lib.c:101:
>
> so I wonder if my security set up itself is flawed...I'll be happy to share
> the scripting I have to do that if folks feel it'll be of use, thanks again
>
> ----- Original message -----
> From: Nico Kruber <[hidden email]>
> To: [hidden email]
> Cc: Adam Roberts <[hidden email]>
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
> k8s in my case Date: Wed, Aug 26, 2020 11:40 AM
>  
> Hi Adam,
> the flink binary will pick up any configuration from the flink-conf.yaml of
> its directory. If that is the same as in the cluster, you wouldn't have to
> pass most of your parameters manually. However, if you prefer not having a
> flink-conf.yaml in place, you could remove the security.ssl.internal.*
> parameter from its call since those only affect internal communication.
>
> If the client's connection to the JM is denied, you would actually have this
> in the JM logs as well which you could check.
>
> To check whether your whole setup works, I would suggest to try without
> security enabled first and then enable it (just to rule out any other
> issues)
>
> From the commands you mentioned, it looks like you're just missing
> security.ssl.rest.enabled=true and because of that, the client would not use
> SSL for the connection.
>
> For more information and setup, I recommend reading through [1] which also
> contains an example at the bottom of the page and how to use curl to test or
> use the REST endpoint.
>
>
> Nico
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-s
> sl.html
> On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> > Hey everyone, I've been experimenting with Flink
> > using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I
> > believe I've successfully deployed a JobManager and TaskManager with
> > security enabled, and a self-signed certificate (the pods come up great).
> > However, I can't do much with this - I can't port-forward and access the
> > UI, nor can I submit jobs to it by running another pod and using the DNS
> > name lookup of the service.
> > I always get
> >  
> > The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > caused an error: java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> > JobGraph.
> > ...
> >  
> > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> > Channel
> > became inactive. ... 37 more
> >  
> >  
> > and this is even with all of the -D security options provided.
> >  
> > The versions of Flink are the same for both my Job and my FlinkCluster
> > (1.11.1).
> > Is this a sensible thing to do? If I weren't using the operator for
> > example, would users be expected to flink run with all of these options?
> > Does anything look odd here? My guess is because security's on, the Job
> > Manager refuses to talk to my submitter.
> > Running as the flink user in the container, I do
> >  
> >
> >       securityContext:
> >
> >         runAsUser: 9999
> >
> >         runAsGroup: 9999
> >
> >       containers:
> >
> >       - name: wordcount
> >
> >         image: adamroberts/mycoolflink:latest
> >
> >         args:
> >
> >         - /opt/flink/bin/flink
> >
> >         - run
> >
> >         - -D
> >
> >         -
> > security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
> >
> >         - -D
> >
> >         -
> > security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jk
> > s
> >
> >         - -D
> >
> >         - security.ssl.rest.keystore-password=thepass # Replace with value
> > of flink-tls-keystore.password
> >
> >         - -D
> >
> >         - security.ssl.rest.key-password=thepass # Replace with value of
> > tls.p12.password
> >
> >         - -D
> >
> >         - security.ssl.rest.truststore-password=thepass # Replace with
> > value of flink-tls-ca.truststore.password
> >
> >         - -D
> >
> >         -
> > security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
> >
> >         - -D
> >
> >         -
> > security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststor
> > e
> > .jks
> >
> >         - -D
> >
> >         - security.ssl.internal.keystore-password=thepass # Replace with
> > value of flink-tls-keystore.password
> >
> >         - -D
> >
> >         - security.ssl.internal.key-password=thepass # Replace with value
> > of flink-tls-keystore.password
> >
> >         - -D
> >
> >         - security.ssl.internal.truststore-password=thepass # Replace with
> > value of flink-tls-truststore.password
> >
> >         - -m
> >
> >         - tls-flink-cluster-1-11-jobmanager:8081
> >
> >         - /opt/flink/examples/batch/WordCount.jar
> >
> >         - --input
> >
> >         - /opt/flink/NOTICE
> >
> >  
> > with the secrets mounted in at the above location (if I exec into my
> > container, I can see they're all there OK). Note that it is a read-only
> > file system.
> > adamroberts/mycoolflink (at this time of this email) is just based
> > on https://github.com/apache/flink-docker .
> > Thanks!
> >  
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> > PO6 3AU
>
>  
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU

--
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner
--
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner

signature.asc (201 bytes) Download Attachment