Flink memory consumption outside JVM on Kubernetes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink memory consumption outside JVM on Kubernetes

Shashank Timmarajus
Hi All,

We have a flink deployment where it consumes messages from Kafka topics in avro format, deserializes them, filters them(for null values), key them by (topic, event time and partition), time window for 60 second and finally aggregates them in avro format and parquet format and writes to S3 buckets. 

My flink chart looks like this:



I am running on flink with the following setup:


jobmanager.rpc.address: flink-jobmanager.XCD.svc.cluster.local

jobmanager.rpc.port: 6123
jobmanager.heap.mb: 4096
jobmanager.web.port: 8081
jobmanager.web.upload.dir: /opt/flink/jars

taskmanager.data.port: 6121
taskmanager.rpc.port: 6122
taskmanager.heap.mb: 4096
taskmanager.numberOfTaskSlots: 4
#taskmanager.memory.preallocate: true
#taskmanager.memory.off-heap: true
taskmanager.debug.memory.startLogThread: true
taskmanager.debug.memory.logIntervalMs: 5000
#taskmanager.network.numberOfBuffers: 4096

blob.server.port: 6124
query.server.port: 6125

parallelism.default: 1
fs.hdfs.hadoopconf: /var/hadoop/conf/

state.backend: rocksdb
state.backend.rocksdb.checkpointdir: /rocksdb
state.checkpoints.dir: s3a://cdp-flink-dev/statebackend/checkpoints
state.savepoints.dir: s3a://cdp-flink-dev/statebackend/savepoints
state.backend.fs.checkpointdir: s3a://cdp-flink-dev/statebackend/rocksdb-state

high-availability: zookeeper
high-availability.zookeeper.quorum: zk-0.zk.cdp-dev.svc.cluster.local:2181,zk-1.zk.cdp-dev.svc.cluster.local:2181,zk-2.zk.cdp-dev.svc.cluster.local:2181,zk-3.zk.cdp-dev.svc.cluster.local:2181,zk-4.zk.cdp-dev.svc.cluster.local:2181
high-availability.zookeeper.storageDir: s3a://cdp-flink-dev/zk
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.path.namespace: /cdpflinkdev
#high-availability.zookeeper.path.cluster-id: /zk-flink
high-availability.jobmanager.port: 50010,50011,50012

security.ssl.enabled: true
jobmanager.web.ssl.enabled: true
taskmanager.data.ssl.enabled: true
blob.service.ssl.enabled: true
akka.ssl.enabled: true

metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx_reporter.host: 127.0.0.1
metrics.reporter.jmx.port: 9999
metrics.scope.jm: flink.<host>.jobmanager
metrics.scope.jm.job: flink.<host>.jobmanager.<job_name>
metrics.scope.tm: flink.<host>.taskmanager.<tm_id>
metrics.scope.tm.job: flink.<host>.taskmanager.<tm_id>.<job_name>
metrics.scope.task: flink.<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
metrics.scope.operator: flink.<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>

security.ssl.enabled: true
security.ssl.keystore: /u01/app/cdp/var/secure/flinkidentity.jks
security.ssl.keystore-password: xyz
security.ssl.key-password: xyz
security.ssl.truststore: /u01/app/cdp/var/secure/flink_truststore.jks
security.ssl.truststore-password: xyz
security.ssl.verify-hostname: false

env.java.opts: -XX:GCTimeRatio=19 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=30 -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:+PrintNMTStatistics -Djava.util.logging.config.file=/opt/flink/conf/parquet.logging.properties -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.rmi.port=9999 -Djava.rmi.server.hostname=127.0.0.1

This setup seems to run fine on EC2 servers without any issues but on kubernetes the pods seems to crash(Out of memory(OOM) killed). I have tried setting openjdk memory to take cgroup memory(using flag UseCGroupMemoryLimitForHeap) but its of no avail. I have also tried changing checkpointing from rocksdb to filesystem and finally disabling but it still doesnt explain the OOM kill of pods. 

MY JMX metrics:

 


Grafana pod metrics:


Apart from the env. opts and inspecting the memory, I couldn't really get to the root of this issue. Any suggestions or help, thanks!​

--
Regards
Shashank Surya