Deployment/Memory Configuration/Scalability

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Deployment/Memory Configuration/Scalability

Radoslav Smilyanov
Hi all,

I am having multiple questions regarding Flink :) Let me give you some background of what I have done so far.

Description
I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed from 6 different kafka topics and it is joined via multiple CoProcessFunctions. On a daily basis the job is handling ~20 millions events from the source kafka topics. 

Configuration
These are the settings I am using:

jobmanager.memory.process.size: 4096m
jobmanager.memory.off-heap.size: 512m
taskmanager.memory.process.size: 12000m
taskmanager.memory.task.off-heap.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 5
taskmanager.rpc.port: 6122
jobmanager.execution.failover-strategy: region
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
state.backend.rocksdb.block.cache-size: 64mb
state.checkpoints.dir: s3://bucket/checkpoints
state.savepoints.dir: s3://bucket/savepoints
s3.access-key: AWS_ACCESS_KEY_ID
s3.secret-key: AWS_SECRET_ACCESS_KEY
s3.endpoint: http://<internal_url>
s3.path.style.access: true
s3.entropy.key: _entropy_
s3.entropy.length: 8
presto.s3.socket-timeout: 10m
client.timeout: 60min

Deployment setup
Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task managers. I have a daily cron job which triggers savepoint in order to have a fresh copy of the whole state.

Problems with the existing setup
1. I observe that savepoints are causing Flink to consume more than the allowed memory. I observe the behavior described in this stackoverflow post (which seems to be solved in 1.12.X if I am getting it right).
2. I cannot achieve high availability with Per-Job mode and thus I ended up having a regular savepoint on a daily basis.

Questions
1. Is it a good idea to have regular savepoints (say on a daily basis)?
2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job?
3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times.
4. Should I consider upgrading to version 1.12.3?
5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3?
6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased?

Best Regards,
Rado



Reply | Threaded
Open this post in threaded view
|

Re: Deployment/Memory Configuration/Scalability

Yangze Guo
Hi, Radoslav,

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job?

Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
your configuration, you need to also enable the checkpoint[2], which
is automatically triggered and helps you to resume the program when
failure, by setting the execution.checkpointing.interval.

> 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times.

I think with the checkpoint, you no longer need to trigger the
savepoint manually with a specific condition as the checkpoint will be
periodically triggered.

> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3?

I'm not an expert on the state backend, but it seems the fix of that
issue is only applied to the docker image. So I guess you can package
a custom image yourselves if you do not want to upgrade. However, if
you are using the Native K8S mode[3] and there is no compatibility
issue, I think it might be good to upgrading because there are also
lots of improvements[4] in 1.12.

> 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased?

As there are 6 Kafka sources in your job, I think the parallelism
should first be fixed with the topic partition number. For metrics,
you could refer to the backpressure of tasks and
numRecordsOutPerSecond[5].

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
[2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[4] https://issues.apache.org/jira/browse/FLINK-17709
[5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
<[hidden email]> wrote:

>
> Hi all,
>
> I am having multiple questions regarding Flink :) Let me give you some background of what I have done so far.
>
> Description
> I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed from 6 different kafka topics and it is joined via multiple CoProcessFunctions. On a daily basis the job is handling ~20 millions events from the source kafka topics.
>
> Configuration
> These are the settings I am using:
>
> jobmanager.memory.process.size: 4096m
> jobmanager.memory.off-heap.size: 512m
> taskmanager.memory.process.size: 12000m
> taskmanager.memory.task.off-heap.size: 512m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 5
> taskmanager.rpc.port: 6122
> jobmanager.execution.failover-strategy: region
> state.backend: rocksdb
> state.backend.incremental: true
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.backend.rocksdb.memory.managed: true
> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
> state.backend.rocksdb.block.cache-size: 64mb
> state.checkpoints.dir: s3://bucket/checkpoints
> state.savepoints.dir: s3://bucket/savepoints
> s3.access-key: AWS_ACCESS_KEY_ID
> s3.secret-key: AWS_SECRET_ACCESS_KEY
> s3.endpoint: http://<internal_url>
> s3.path.style.access: true
> s3.entropy.key: _entropy_
> s3.entropy.length: 8
> presto.s3.socket-timeout: 10m
> client.timeout: 60min
>
> Deployment setup
> Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task managers. I have a daily cron job which triggers savepoint in order to have a fresh copy of the whole state.
>
> Problems with the existing setup
> 1. I observe that savepoints are causing Flink to consume more than the allowed memory. I observe the behavior described in this stackoverflow post (which seems to be solved in 1.12.X if I am getting it right).
> 2. I cannot achieve high availability with Per-Job mode and thus I ended up having a regular savepoint on a daily basis.
>
> Questions
> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job?
> 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times.
> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3?
> 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased?
>
> Best Regards,
> Rado
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Deployment/Memory Configuration/Scalability

Radoslav Smilyanov
Hi Yangze Guo,

Thanks for your reply.

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job?
Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
your configuration, you need to also enable the checkpoint[2], which
is automatically triggered and helps you to resume the program when
failure, by setting the execution.checkpointing.interval.
 
I forgot to add the checkpoint configuration since it's part of a custom job configuration which is mounted in each pod. So checkpoints are enabled. :)
That's why savepoint is triggered on a daily basis since the existing deployment setup has a single Job Manager.
I will take a look at k8s or Zookeeper HA options.

> 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times.
I think with the checkpoint, you no longer need to trigger the
savepoint manually with a specific condition as the checkpoint will be
periodically triggered.

Checkpoints are already enabled (once per every 10 minutes). Once HA is setuped correctly I think that savepoints can be used only when the job needs to be updated.  

> 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased?
As there are 6 Kafka sources in your job, I think the parallelism
should first be fixed with the topic partition number. For metrics,
you could refer to the backpressure of tasks and
numRecordsOutPerSecond[5].

Currently I am using parallelism which is equal to the highest number of kafka topic partitions. Unfortunately some of the topics have higher load compared to others and thus some of them are having 1 partition while others are having 4 partitions (for example).

Thanks,
Rado

On Tue, Apr 27, 2021 at 7:50 AM Yangze Guo <[hidden email]> wrote:
Hi, Radoslav,

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job?

Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
your configuration, you need to also enable the checkpoint[2], which
is automatically triggered and helps you to resume the program when
failure, by setting the execution.checkpointing.interval.

> 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times.

I think with the checkpoint, you no longer need to trigger the
savepoint manually with a specific condition as the checkpoint will be
periodically triggered.

> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3?

I'm not an expert on the state backend, but it seems the fix of that
issue is only applied to the docker image. So I guess you can package
a custom image yourselves if you do not want to upgrade. However, if
you are using the Native K8S mode[3] and there is no compatibility
issue, I think it might be good to upgrading because there are also
lots of improvements[4] in 1.12.

> 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased?

As there are 6 Kafka sources in your job, I think the parallelism
should first be fixed with the topic partition number. For metrics,
you could refer to the backpressure of tasks and
numRecordsOutPerSecond[5].

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
[2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[4] https://issues.apache.org/jira/browse/FLINK-17709
[5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
<[hidden email]> wrote:
>
> Hi all,
>
> I am having multiple questions regarding Flink :) Let me give you some background of what I have done so far.
>
> Description
> I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed from 6 different kafka topics and it is joined via multiple CoProcessFunctions. On a daily basis the job is handling ~20 millions events from the source kafka topics.
>
> Configuration
> These are the settings I am using:
>
> jobmanager.memory.process.size: 4096m
> jobmanager.memory.off-heap.size: 512m
> taskmanager.memory.process.size: 12000m
> taskmanager.memory.task.off-heap.size: 512m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 5
> taskmanager.rpc.port: 6122
> jobmanager.execution.failover-strategy: region
> state.backend: rocksdb
> state.backend.incremental: true
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.backend.rocksdb.memory.managed: true
> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
> state.backend.rocksdb.block.cache-size: 64mb
> state.checkpoints.dir: s3://bucket/checkpoints
> state.savepoints.dir: s3://bucket/savepoints
> s3.access-key: AWS_ACCESS_KEY_ID
> s3.secret-key: AWS_SECRET_ACCESS_KEY
> s3.endpoint: http://<internal_url>
> s3.path.style.access: true
> s3.entropy.key: _entropy_
> s3.entropy.length: 8
> presto.s3.socket-timeout: 10m
> client.timeout: 60min
>
> Deployment setup
> Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task managers. I have a daily cron job which triggers savepoint in order to have a fresh copy of the whole state.
>
> Problems with the existing setup
> 1. I observe that savepoints are causing Flink to consume more than the allowed memory. I observe the behavior described in this stackoverflow post (which seems to be solved in 1.12.X if I am getting it right).
> 2. I cannot achieve high availability with Per-Job mode and thus I ended up having a regular savepoint on a daily basis.
>
> Questions
> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job?
> 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times.
> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3?
> 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased?
>
> Best Regards,
> Rado
>
>
>