Hi all,
I am having multiple questions regarding Flink :) Let me give you some background of what I have done so far. Description I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed from 6 different kafka topics and it is joined via multiple CoProcessFunctions. On a daily basis the job is handling ~20 millions events from the source kafka topics. Configuration These are the settings I am using: jobmanager.memory.off-heap.size: 512m taskmanager.memory.process.size: 12000m taskmanager.memory.task.off-heap.size: 512m taskmanager.numberOfTaskSlots: 1 parallelism.default: 5 taskmanager.rpc.port: 6122 jobmanager.execution.failover-strategy: region state.backend: rocksdb state.backend.incremental: true state.backend.rocksdb.localdir: /opt/flink/rocksdb state.backend.rocksdb.memory.managed: true state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED state.backend.rocksdb.block.cache-size: 64mb state.checkpoints.dir: s3://bucket/checkpoints state.savepoints.dir: s3://bucket/savepoints s3.access-key: AWS_ACCESS_KEY_ID s3.secret-key: AWS_SECRET_ACCESS_KEY s3.endpoint: http://<internal_url> s3.path.style.access: true s3.entropy.key: _entropy_ s3.entropy.length: 8 presto.s3.socket-timeout: 10m client.timeout: 60min Deployment setup Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task managers. I have a daily cron job which triggers savepoint in order to have a fresh copy of the whole state. Problems with the existing setup 1. I observe that savepoints are causing Flink to consume more than the allowed memory. I observe the behavior described in this stackoverflow post (which seems to be solved in 1.12.X if I am getting it right). 2. I cannot achieve high availability with Per-Job mode and thus I ended up having a regular savepoint on a daily basis. Questions 1. Is it a good idea to have regular savepoints (say on a daily basis)? 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job? 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times. 4. Should I consider upgrading to version 1.12.3? 5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3? 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased? Best Regards, Rado |
Hi, Radoslav,
> 1. Is it a good idea to have regular savepoints (say on a daily basis)? > 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job? Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at your configuration, you need to also enable the checkpoint[2], which is automatically triggered and helps you to resume the program when failure, by setting the execution.checkpointing.interval. > 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times. I think with the checkpoint, you no longer need to trigger the savepoint manually with a specific condition as the checkpoint will be periodically triggered. > 4. Should I consider upgrading to version 1.12.3? > 5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3? I'm not an expert on the state backend, but it seems the fix of that issue is only applied to the docker image. So I guess you can package a custom image yourselves if you do not want to upgrade. However, if you are using the Native K8S mode[3] and there is no compatibility issue, I think it might be good to upgrading because there are also lots of improvements[4] in 1.12. > 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased? As there are 6 Kafka sources in your job, I think the parallelism should first be fixed with the topic partition number. For metrics, you could refer to the backpressure of tasks and numRecordsOutPerSecond[5]. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/ [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html [4] https://issues.apache.org/jira/browse/FLINK-17709 [5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io Best, Yangze Guo On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov <[hidden email]> wrote: > > Hi all, > > I am having multiple questions regarding Flink :) Let me give you some background of what I have done so far. > > Description > I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed from 6 different kafka topics and it is joined via multiple CoProcessFunctions. On a daily basis the job is handling ~20 millions events from the source kafka topics. > > Configuration > These are the settings I am using: > > jobmanager.memory.process.size: 4096m > jobmanager.memory.off-heap.size: 512m > taskmanager.memory.process.size: 12000m > taskmanager.memory.task.off-heap.size: 512m > taskmanager.numberOfTaskSlots: 1 > parallelism.default: 5 > taskmanager.rpc.port: 6122 > jobmanager.execution.failover-strategy: region > state.backend: rocksdb > state.backend.incremental: true > state.backend.rocksdb.localdir: /opt/flink/rocksdb > state.backend.rocksdb.memory.managed: true > state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED > state.backend.rocksdb.block.cache-size: 64mb > state.checkpoints.dir: s3://bucket/checkpoints > state.savepoints.dir: s3://bucket/savepoints > s3.access-key: AWS_ACCESS_KEY_ID > s3.secret-key: AWS_SECRET_ACCESS_KEY > s3.endpoint: http://<internal_url> > s3.path.style.access: true > s3.entropy.key: _entropy_ > s3.entropy.length: 8 > presto.s3.socket-timeout: 10m > client.timeout: 60min > > Deployment setup > Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task managers. I have a daily cron job which triggers savepoint in order to have a fresh copy of the whole state. > > Problems with the existing setup > 1. I observe that savepoints are causing Flink to consume more than the allowed memory. I observe the behavior described in this stackoverflow post (which seems to be solved in 1.12.X if I am getting it right). > 2. I cannot achieve high availability with Per-Job mode and thus I ended up having a regular savepoint on a daily basis. > > Questions > 1. Is it a good idea to have regular savepoints (say on a daily basis)? > 2. Is it possible to have high availability with Per-Job mode? Or maybe I should go with session mode and make sure that my flink cluster is running a single job? > 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times. > 4. Should I consider upgrading to version 1.12.3? > 5. Should I consider switching off state.backend.rocksdb.memory.managed property even in version 1.12.3? > 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased? > > Best Regards, > Rado > > > |
Hi Yangze Guo, Thanks for your reply. > 1. Is it a good idea to have regular savepoints (say on a daily basis)? I forgot to add the checkpoint configuration since it's part of a custom job configuration which is mounted in each pod. So checkpoints are enabled. :) That's why savepoint is triggered on a daily basis since the existing deployment setup has a single Job Manager. I will take a look at k8s or Zookeeper HA options. > 3. Let's assume that savepoints should be triggered only before job update/deployment. How can I trigger a savepoint if my job is already consuming more than 80% of the allowed memory per pod in k8s? My observations show that k8s kills task managers (which are running as pods) and I need to retry it a couple of times. Checkpoints are already enabled (once per every 10 minutes). Once HA is setuped correctly I think that savepoints can be used only when the job needs to be updated. > 6. How do I decide when the job parallelism should be increased? Are there some metrics which can lead me to a clue that the parallelism should be increased? Currently I am using parallelism which is equal to the highest number of kafka topic partitions. Unfortunately some of the topics have higher load compared to others and thus some of them are having 1 partition while others are having 4 partitions (for example). Thanks, Rado On Tue, Apr 27, 2021 at 7:50 AM Yangze Guo <[hidden email]> wrote: Hi, Radoslav, |
Free forum by Nabble | Edit this page |