scaling issue Running Flink on Kubernetes

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

scaling issue Running Flink on Kubernetes

Eleanore Jin
Hi Experts, 
I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. 

Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to the replicas from CRD (parallelism can be configured via env variables for our application). which causes the job manager restart. hence a new Flink job. But the consumer group does not change, so it will continue from the offset where it left.

In addition, operator will also update Task Manager's deployment replicas, and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but new task manager pods will be created. 

And we observed a skew in the partition offset consumed. e.g. some partitions have huge lags and other partitions have small lags. (observed from burrow)

This is also validated by the metrics from Flink UI, showing the throughput differs for slotss

Any clue why this is the case? 

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: scaling issue Running Flink on Kubernetes

Xintong Song
Hi Eleanore,

I have a few more questions regarding your issue.
  • Which Flink version are you using?
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
  • Keeping the job running a while after the scale-up, does the skew ease?
I suspect the performance difference might be an outcome of some warming up issues. E.g., the existing TMs might have some file already localized, or some memory buffers already promoted to the JVM tenured area, while the new TMs have not.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Experts, 
I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. 

Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to the replicas from CRD (parallelism can be configured via env variables for our application). which causes the job manager restart. hence a new Flink job. But the consumer group does not change, so it will continue from the offset where it left.

In addition, operator will also update Task Manager's deployment replicas, and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but new task manager pods will be created. 

And we observed a skew in the partition offset consumed. e.g. some partitions have huge lags and other partitions have small lags. (observed from burrow)

This is also validated by the metrics from Flink UI, showing the throughput differs for slotss

Any clue why this is the case? 

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: scaling issue Running Flink on Kubernetes

Eleanore Jin
_Hi Xintong, 

Thanks for the prompt reply! To answer your question:
  • Which Flink version are you using?
               v1.8.2
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
               I also tried this, it seems skew also happens even I do not change the parallelism, so it may not caused by scale-up/down
  • Keeping the job running a while after the scale-up, does the skew ease?
               So the skew happens in such a way that: some partitions lags down to 0, but other partitions are still at level of 10_000, and I am seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

I have a few more questions regarding your issue.
  • Which Flink version are you using?
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
  • Keeping the job running a while after the scale-up, does the skew ease?
I suspect the performance difference might be an outcome of some warming up issues. E.g., the existing TMs might have some file already localized, or some memory buffers already promoted to the JVM tenured area, while the new TMs have not.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Experts, 
I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. 

Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to the replicas from CRD (parallelism can be configured via env variables for our application). which causes the job manager restart. hence a new Flink job. But the consumer group does not change, so it will continue from the offset where it left.

In addition, operator will also update Task Manager's deployment replicas, and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but new task manager pods will be created. 

And we observed a skew in the partition offset consumed. e.g. some partitions have huge lags and other partitions have small lags. (observed from burrow)

This is also validated by the metrics from Flink UI, showing the throughput differs for slotss

Any clue why this is the case? 

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: scaling issue Running Flink on Kubernetes

Xintong Song
Hi Eleanore,

That does't sound like a scaling issue. It's probably a data skew, that the data volume on some of the keys are significantly higher than others. I'm not familiar with this area though, and have copied Jark for you, who is one of the community experts in this area.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin <[hidden email]> wrote:
_Hi Xintong, 

Thanks for the prompt reply! To answer your question:
  • Which Flink version are you using?
               v1.8.2
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
               I also tried this, it seems skew also happens even I do not change the parallelism, so it may not caused by scale-up/down
  • Keeping the job running a while after the scale-up, does the skew ease?
               So the skew happens in such a way that: some partitions lags down to 0, but other partitions are still at level of 10_000, and I am seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

I have a few more questions regarding your issue.
  • Which Flink version are you using?
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
  • Keeping the job running a while after the scale-up, does the skew ease?
I suspect the performance difference might be an outcome of some warming up issues. E.g., the existing TMs might have some file already localized, or some memory buffers already promoted to the JVM tenured area, while the new TMs have not.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Experts, 
I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. 

Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to the replicas from CRD (parallelism can be configured via env variables for our application). which causes the job manager restart. hence a new Flink job. But the consumer group does not change, so it will continue from the offset where it left.

In addition, operator will also update Task Manager's deployment replicas, and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but new task manager pods will be created. 

And we observed a skew in the partition offset consumed. e.g. some partitions have huge lags and other partitions have small lags. (observed from burrow)

This is also validated by the metrics from Flink UI, showing the throughput differs for slotss

Any clue why this is the case? 

Thanks a lot!
Eleanore
Reply | Threaded
Open this post in threaded view
|

Re: scaling issue Running Flink on Kubernetes

Flavio Pompermaier

On Wed, Mar 11, 2020 at 4:46 AM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

That does't sound like a scaling issue. It's probably a data skew, that the data volume on some of the keys are significantly higher than others. I'm not familiar with this area though, and have copied Jark for you, who is one of the community experts in this area.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin <[hidden email]> wrote:
_Hi Xintong, 

Thanks for the prompt reply! To answer your question:
  • Which Flink version are you using?
               v1.8.2
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
               I also tried this, it seems skew also happens even I do not change the parallelism, so it may not caused by scale-up/down
  • Keeping the job running a while after the scale-up, does the skew ease?
               So the skew happens in such a way that: some partitions lags down to 0, but other partitions are still at level of 10_000, and I am seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

I have a few more questions regarding your issue.
  • Which Flink version are you using?
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
  • Keeping the job running a while after the scale-up, does the skew ease?
I suspect the performance difference might be an outcome of some warming up issues. E.g., the existing TMs might have some file already localized, or some memory buffers already promoted to the JVM tenured area, while the new TMs have not.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Experts, 
I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. 

Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to the replicas from CRD (parallelism can be configured via env variables for our application). which causes the job manager restart. hence a new Flink job. But the consumer group does not change, so it will continue from the offset where it left.

In addition, operator will also update Task Manager's deployment replicas, and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but new task manager pods will be created. 

And we observed a skew in the partition offset consumed. e.g. some partitions have huge lags and other partitions have small lags. (observed from burrow)

This is also validated by the metrics from Flink UI, showing the throughput differs for slotss

Any clue why this is the case? 

Thanks a lot!
Eleanore

Reply | Threaded
Open this post in threaded view
|

Re: scaling issue Running Flink on Kubernetes

Flavio Pompermaier
Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I don't know which one of the 2 is better)

On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier <[hidden email]> wrote:

On Wed, Mar 11, 2020 at 4:46 AM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

That does't sound like a scaling issue. It's probably a data skew, that the data volume on some of the keys are significantly higher than others. I'm not familiar with this area though, and have copied Jark for you, who is one of the community experts in this area.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin <[hidden email]> wrote:
_Hi Xintong, 

Thanks for the prompt reply! To answer your question:
  • Which Flink version are you using?
               v1.8.2
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
               I also tried this, it seems skew also happens even I do not change the parallelism, so it may not caused by scale-up/down
  • Keeping the job running a while after the scale-up, does the skew ease?
               So the skew happens in such a way that: some partitions lags down to 0, but other partitions are still at level of 10_000, and I am seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

I have a few more questions regarding your issue.
  • Which Flink version are you using?
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
  • Keeping the job running a while after the scale-up, does the skew ease?
I suspect the performance difference might be an outcome of some warming up issues. E.g., the existing TMs might have some file already localized, or some memory buffers already promoted to the JVM tenured area, while the new TMs have not.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Experts, 
I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. 

Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to the replicas from CRD (parallelism can be configured via env variables for our application). which causes the job manager restart. hence a new Flink job. But the consumer group does not change, so it will continue from the offset where it left.

In addition, operator will also update Task Manager's deployment replicas, and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but new task manager pods will be created. 

And we observed a skew in the partition offset consumed. e.g. some partitions have huge lags and other partitions have small lags. (observed from burrow)

This is also validated by the metrics from Flink UI, showing the throughput differs for slotss

Any clue why this is the case? 

Thanks a lot!
Eleanore


Reply | Threaded
Open this post in threaded view
|

Re: scaling issue Running Flink on Kubernetes

Eleanore Jin
Hi Flavio,

We have implemented our own flink operator, the operator will start a flink job cluster (the application jar is already packaged together with flink in the docker image). I believe Google's flink operator will start a session cluster, and user can submit the flink job via REST. Not looked into lyft one before. 

Eleanore


On Wed, Mar 11, 2020 at 2:21 AM Flavio Pompermaier <[hidden email]> wrote:
Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I don't know which one of the 2 is better)

On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier <[hidden email]> wrote:

On Wed, Mar 11, 2020 at 4:46 AM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

That does't sound like a scaling issue. It's probably a data skew, that the data volume on some of the keys are significantly higher than others. I'm not familiar with this area though, and have copied Jark for you, who is one of the community experts in this area.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin <[hidden email]> wrote:
_Hi Xintong, 

Thanks for the prompt reply! To answer your question:
  • Which Flink version are you using?
               v1.8.2
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
               I also tried this, it seems skew also happens even I do not change the parallelism, so it may not caused by scale-up/down
  • Keeping the job running a while after the scale-up, does the skew ease?
               So the skew happens in such a way that: some partitions lags down to 0, but other partitions are still at level of 10_000, and I am seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song <[hidden email]> wrote:
Hi Eleanore,

I have a few more questions regarding your issue.
  • Which Flink version are you using?
  • Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value?
  • Keeping the job running a while after the scale-up, does the skew ease?
I suspect the performance difference might be an outcome of some warming up issues. E.g., the existing TMs might have some file already localized, or some memory buffers already promoted to the JVM tenured area, while the new TMs have not.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin <[hidden email]> wrote:
Hi Experts, 
I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. 

Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to the replicas from CRD (parallelism can be configured via env variables for our application). which causes the job manager restart. hence a new Flink job. But the consumer group does not change, so it will continue from the offset where it left.

In addition, operator will also update Task Manager's deployment replicas, and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but new task manager pods will be created. 

And we observed a skew in the partition offset consumed. e.g. some partitions have huge lags and other partitions have small lags. (observed from burrow)

This is also validated by the metrics from Flink UI, showing the throughput differs for slotss

Any clue why this is the case? 

Thanks a lot!
Eleanore