Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Vishal Santoshi
I  have 2 options

1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on JM  ( Though it could on all TMs but that looks to an overkill ) 

2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on TMs.  The Query Proxies are on the TMs, so in essence the communication would be within containers of the POD and I could load balance ( have ot test  ) 

The second alternative seems doable, but looks an overkill  but am not sure how to establish a TM on the standalone QueryableStateClient, given that TM's pod IP is not known till the pod is launched.

Has anyone had a successful QueryableState setup for flink  on k8s? 

Regards,
Reply | Threaded
Open this post in threaded view
|

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Vishal Santoshi

I think I got a handle on this. For those who might want to do this


Here are the steps ( I could share the  Jetty/Jersey REST code too is required ) 


1. Create a side car container on each pod that has a TM. I wrote a simple Jetty/Jersey REST based server that  execute queries against the local TM query server.

  .

  .

  - name: queryable-state

        image: _IMAGE_

        args: ["queryable-state"]

        env:

          - name: POD_IP

            valueFrom:

              fieldRef:

                fieldPath: status.podIP

        ports:

          - containerPort: 9999

            name: qstate-client

        resources:

          requests:

            cpu: "0.25"

            memory: "256Mi"


   Note that POD_IP is the ip used by the REST based server to start the QueryableStateClient and the port is the default port of the TM query server ( 9069 I think ) of the colocated TM container.



2. Expose the port ( in this case 9999 ) at the k8s service layer.



And that did it. 








I though am worried about a couple of things


1. 

 The  TM query server will ask JM for the key group and hence the TM a key belongs to for every request and then coordinate the coummunication between the client and that TM. Does flink do any optimzation, as in cache the key ranges and thus the affinity to a TM to reduce JM stress. I would imagine that being some well known distribution function on some well known hash algorithm, an incident key could be pinned to a TM without visiting the JM more then once. 


2. 

We do have use cases where we would want to iterate over all the keys in a key group ( and by extension on a TM ) for a job. Is that a possibility ? 


3. 

The overhead of having as many client containers as TMs.



Any advise/ideas on the 3 worry points ? 



Regards


On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <[hidden email]> wrote:
I  have 2 options

1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on JM  ( Though it could on all TMs but that looks to an overkill ) 

2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on TMs.  The Query Proxies are on the TMs, so in essence the communication would be within containers of the POD and I could load balance ( have ot test  ) 

The second alternative seems doable, but looks an overkill  but am not sure how to establish a TM on the standalone QueryableStateClient, given that TM's pod IP is not known till the pod is launched.

Has anyone had a successful QueryableState setup for flink  on k8s? 

Regards,
Reply | Threaded
Open this post in threaded view
|

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Konstantin Knauf-2
Hi Vishal,

my approach would be a single Kubernetes service, which is backed by all Taskmanagers of the job. The Taskmanagers will proxy the request for a specific key to the correct Taskmanager. Yes, the Taskmanagers will cache the location of the key groups.

In addition to this Kubernetes service, you can of course have a Jetty/Jersey REST based server that sends queries to this service.

Please le me know if this works for you.

Hope this helps and cheers,

Konstantin


On Thu, Mar 28, 2019 at 12:37 AM Vishal Santoshi <[hidden email]> wrote:

I think I got a handle on this. For those who might want to do this


Here are the steps ( I could share the  Jetty/Jersey REST code too is required ) 


1. Create a side car container on each pod that has a TM. I wrote a simple Jetty/Jersey REST based server that  execute queries against the local TM query server.

  .

  .

  - name: queryable-state

        image: _IMAGE_

        args: ["queryable-state"]

        env:

          - name: POD_IP

            valueFrom:

              fieldRef:

                fieldPath: status.podIP

        ports:

          - containerPort: 9999

            name: qstate-client

        resources:

          requests:

            cpu: "0.25"

            memory: "256Mi"


   Note that POD_IP is the ip used by the REST based server to start the QueryableStateClient and the port is the default port of the TM query server ( 9069 I think ) of the colocated TM container.



2. Expose the port ( in this case 9999 ) at the k8s service layer.



And that did it. 








I though am worried about a couple of things


1. 

 The  TM query server will ask JM for the key group and hence the TM a key belongs to for every request and then coordinate the coummunication between the client and that TM. Does flink do any optimzation, as in cache the key ranges and thus the affinity to a TM to reduce JM stress. I would imagine that being some well known distribution function on some well known hash algorithm, an incident key could be pinned to a TM without visiting the JM more then once. 


2. 

We do have use cases where we would want to iterate over all the keys in a key group ( and by extension on a TM ) for a job. Is that a possibility ? 


3. 

The overhead of having as many client containers as TMs.



Any advise/ideas on the 3 worry points ? 



Regards


On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <[hidden email]> wrote:
I  have 2 options

1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on JM  ( Though it could on all TMs but that looks to an overkill ) 

2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on TMs.  The Query Proxies are on the TMs, so in essence the communication would be within containers of the POD and I could load balance ( have ot test  ) 

The second alternative seems doable, but looks an overkill  but am not sure how to establish a TM on the standalone QueryableStateClient, given that TM's pod IP is not known till the pod is launched.

Has anyone had a successful QueryableState setup for flink  on k8s? 

Regards,


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Vishal Santoshi
Thanks Konstantin,
                            That makes sense. To give you some context, the reason we are gravitating towards Queryable State is the architectural preference of Prometheus to scrape ( pull rather then push model ) and our intent to expose aggregations. That said your idea makes sense. The worry I had is the ip resolution of TMs that QueryableStateClient does and our wanting to avoid static ips . If I understand correctly you are proposing a proxy "external" to the Job deployment, as in an external service that discovers the job and  works off the ingress End Point that exposes the Queryable Port of the TMs ? 

That creates a fragmented architecture that I wanted to avoid, iff I understood your advise correctly.  

Vishal




            

On Fri, Mar 29, 2019 at 5:42 AM Konstantin Knauf <[hidden email]> wrote:
Hi Vishal,

my approach would be a single Kubernetes service, which is backed by all Taskmanagers of the job. The Taskmanagers will proxy the request for a specific key to the correct Taskmanager. Yes, the Taskmanagers will cache the location of the key groups.

In addition to this Kubernetes service, you can of course have a Jetty/Jersey REST based server that sends queries to this service.

Please le me know if this works for you.

Hope this helps and cheers,

Konstantin


On Thu, Mar 28, 2019 at 12:37 AM Vishal Santoshi <[hidden email]> wrote:

I think I got a handle on this. For those who might want to do this


Here are the steps ( I could share the  Jetty/Jersey REST code too is required ) 


1. Create a side car container on each pod that has a TM. I wrote a simple Jetty/Jersey REST based server that  execute queries against the local TM query server.

  .

  .

  - name: queryable-state

        image: _IMAGE_

        args: ["queryable-state"]

        env:

          - name: POD_IP

            valueFrom:

              fieldRef:

                fieldPath: status.podIP

        ports:

          - containerPort: 9999

            name: qstate-client

        resources:

          requests:

            cpu: "0.25"

            memory: "256Mi"


   Note that POD_IP is the ip used by the REST based server to start the QueryableStateClient and the port is the default port of the TM query server ( 9069 I think ) of the colocated TM container.



2. Expose the port ( in this case 9999 ) at the k8s service layer.



And that did it. 








I though am worried about a couple of things


1. 

 The  TM query server will ask JM for the key group and hence the TM a key belongs to for every request and then coordinate the coummunication between the client and that TM. Does flink do any optimzation, as in cache the key ranges and thus the affinity to a TM to reduce JM stress. I would imagine that being some well known distribution function on some well known hash algorithm, an incident key could be pinned to a TM without visiting the JM more then once. 


2. 

We do have use cases where we would want to iterate over all the keys in a key group ( and by extension on a TM ) for a job. Is that a possibility ? 


3. 

The overhead of having as many client containers as TMs.



Any advise/ideas on the 3 worry points ? 



Regards


On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <[hidden email]> wrote:
I  have 2 options

1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on JM  ( Though it could on all TMs but that looks to an overkill ) 

2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on TMs.  The Query Proxies are on the TMs, so in essence the communication would be within containers of the POD and I could load balance ( have ot test  ) 

The second alternative seems doable, but looks an overkill  but am not sure how to establish a TM on the standalone QueryableStateClient, given that TM's pod IP is not known till the pod is launched.

Has anyone had a successful QueryableState setup for flink  on k8s? 

Regards,


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Vishal Santoshi

Konstantin,
                            I revert my reservations. My initial reservation was having 2 services ( one for TMs and one for the native Queryable Client proxy  ). Having established this setup though  it makes sense. Having the  native Queryable Client proxy as a side car kind of deeply couples the query layer with the TMs, inhibiting independent development of the query layer.

Thanks.



On Fri, Mar 29, 2019 at 9:08 AM Vishal Santoshi <[hidden email]> wrote:
Thanks Konstantin,
                            That makes sense. To give you some context, the reason we are gravitating towards Queryable State is the architectural preference of Prometheus to scrape ( pull rather then push model ) and our intent to expose aggregations. That said your idea makes sense. The worry I had is the ip resolution of TMs that QueryableStateClient does and our wanting to avoid static ips . If I understand correctly you are proposing a proxy "external" to the Job deployment, as in an external service that discovers the job and  works off the ingress End Point that exposes the Queryable Port of the TMs ? 

That creates a fragmented architecture that I wanted to avoid, iff I understood your advise correctly.  

Vishal




            

On Fri, Mar 29, 2019 at 5:42 AM Konstantin Knauf <[hidden email]> wrote:
Hi Vishal,

my approach would be a single Kubernetes service, which is backed by all Taskmanagers of the job. The Taskmanagers will proxy the request for a specific key to the correct Taskmanager. Yes, the Taskmanagers will cache the location of the key groups.

In addition to this Kubernetes service, you can of course have a Jetty/Jersey REST based server that sends queries to this service.

Please le me know if this works for you.

Hope this helps and cheers,

Konstantin


On Thu, Mar 28, 2019 at 12:37 AM Vishal Santoshi <[hidden email]> wrote:

I think I got a handle on this. For those who might want to do this


Here are the steps ( I could share the  Jetty/Jersey REST code too is required ) 


1. Create a side car container on each pod that has a TM. I wrote a simple Jetty/Jersey REST based server that  execute queries against the local TM query server.

  .

  .

  - name: queryable-state

        image: _IMAGE_

        args: ["queryable-state"]

        env:

          - name: POD_IP

            valueFrom:

              fieldRef:

                fieldPath: status.podIP

        ports:

          - containerPort: 9999

            name: qstate-client

        resources:

          requests:

            cpu: "0.25"

            memory: "256Mi"


   Note that POD_IP is the ip used by the REST based server to start the QueryableStateClient and the port is the default port of the TM query server ( 9069 I think ) of the colocated TM container.



2. Expose the port ( in this case 9999 ) at the k8s service layer.



And that did it. 








I though am worried about a couple of things


1. 

 The  TM query server will ask JM for the key group and hence the TM a key belongs to for every request and then coordinate the coummunication between the client and that TM. Does flink do any optimzation, as in cache the key ranges and thus the affinity to a TM to reduce JM stress. I would imagine that being some well known distribution function on some well known hash algorithm, an incident key could be pinned to a TM without visiting the JM more then once. 


2. 

We do have use cases where we would want to iterate over all the keys in a key group ( and by extension on a TM ) for a job. Is that a possibility ? 


3. 

The overhead of having as many client containers as TMs.



Any advise/ideas on the 3 worry points ? 



Regards


On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <[hidden email]> wrote:
I  have 2 options

1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on JM  ( Though it could on all TMs but that looks to an overkill ) 

2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a side car container colocated on TMs.  The Query Proxies are on the TMs, so in essence the communication would be within containers of the POD and I could load balance ( have ot test  ) 

The second alternative seems doable, but looks an overkill  but am not sure how to establish a TM on the standalone QueryableStateClient, given that TM's pod IP is not known till the pod is launched.

Has anyone had a successful QueryableState setup for flink  on k8s? 

Regards,


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen