I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus. I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job). Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus,
flink-cep, etc). Looking at Flink's documentation, it says > "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution" https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere. I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR. This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath. ``` java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137) ``` EMR resource in terraform ```resource "aws_emr_cluster" "emr_flink" { name = "ce-emr-flink-arn" release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing applications = ["Flink"] ec2_attributes { key_name = "ce_test" subnet_id = "${aws_subnet.ce_test_subnet_public.id}" instance_profile = "${aws_iam_instance_profile.emr_profile.arn}" emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}" emr_managed_slave_security_group = "${aws_security_group.allow_all_vpc.id}" additional_master_security_groups = "${aws_security_group.external_connectivity.id}" additional_slave_security_groups = "${aws_security_group.external_connectivity.id}" } ebs_root_volume_size = 100 master_instance_type = "m4.xlarge" core_instance_type = "m4.xlarge" core_instance_count = 2 service_role = "${aws_iam_role.iam_emr_service_role.arn}" configurations_json = <<EOF [ { "Classification": "flink-conf", "Properties": { "parallelism.default": "8", "state.backend": "RocksDB", "state.backend.async": "true", "state.backend.incremental": "true", "state.savepoints.dir": "file:///savepoints", "state.checkpoints.dir": "file:///checkpoints", "web.submit.enable": "true", "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter", "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}", "metrics.reporter.promgateway.port": "9091", "metrics.reporter.promgateway.jobName": "ce-test", "metrics.reporter.promgateway.randomJobNameSuffix": "true", "metrics.reporter.promgateway.deleteOnShutdown": "false" } } ] EOF } ``` I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done |
I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus. I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job). Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus,
flink-cep, etc). Looking at Flink's documentation, it says > "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution" But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere. I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR. This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath. ``` java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137) ``` EMR resource in terraform ```resource "aws_emr_cluster" "emr_flink" { name = "ce-emr-flink-arn" release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing applications = ["Flink"] ec2_attributes { key_name = "ce_test" subnet_id = "${aws_subnet.ce_test_subnet_public.id}" instance_profile = "${aws_iam_instance_profile.emr_profile.arn}" emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}" emr_managed_slave_security_group = "${aws_security_group.allow_all_vpc.id}" additional_master_security_groups = "${aws_security_group.external_connectivity.id}" additional_slave_security_groups = "${aws_security_group.external_connectivity.id}" } ebs_root_volume_size = 100 master_instance_type = "m4.xlarge" core_instance_type = "m4.xlarge" core_instance_count = 2 service_role = "${aws_iam_role.iam_emr_service_role.arn}" configurations_json = <<EOF [ { "Classification": "flink-conf", "Properties": { "parallelism.default": "8", "state.backend": "RocksDB", "state.backend.async": "true", "state.backend.incremental": "true", "state.savepoints.dir": "file:///savepoints", "state.checkpoints.dir": "file:///checkpoints", "web.submit.enable": "true", "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter", "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}", "metrics.reporter.promgateway.port": "9091", "metrics.reporter.promgateway.jobName": "ce-test", "metrics.reporter.promgateway.randomJobNameSuffix": "true", "metrics.reporter.promgateway.deleteOnShutdown": "false" } } ] EOF } ``` I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done |
In reply to this post by Jack Tuck
Hi Jack
How about extracting flink-metrics-prometheus-1.6.1.jar from downloaded distribution tar
https://archive.apache.org/dist/flink/flink-1.6.1/ and upload it to `/usr/lib/flink/lib` on EMR?
Otherwise, I believe setup a customized Flink cluster on EMR [1] should work if no other convenient solutions.
Best
Yun Tang
From: Jack Tuck <[hidden email]>
Sent: Thursday, March 7, 2019 3:39 To: [hidden email] Subject: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)? I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.
I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job). Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).
Looking at Flink's documentation, it says > "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution" https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter
But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere.
I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.
This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath. ``` java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137) ```
EMR resource in terraform ```resource "aws_emr_cluster" "emr_flink" { name = "ce-emr-flink-arn" release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing applications = ["Flink"]
ec2_attributes { key_name = "ce_test" subnet_id = "${aws_subnet.ce_test_subnet_public.id}" instance_profile = "${aws_iam_instance_profile.emr_profile.arn}" emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}" emr_managed_slave_security_group = "${aws_security_group.allow_all_vpc.id}" additional_master_security_groups = "${aws_security_group.external_connectivity.id}" additional_slave_security_groups = "${aws_security_group.external_connectivity.id}" }
ebs_root_volume_size = 100 master_instance_type = "m4.xlarge" core_instance_type = "m4.xlarge" core_instance_count = 2
service_role = "${aws_iam_role.iam_emr_service_role.arn}"
configurations_json = <<EOF [ { "Classification": "flink-conf", "Properties": { "parallelism.default": "8", "state.backend": "RocksDB", "state.backend.async": "true", "state.backend.incremental": "true", "state.savepoints.dir": "file:///savepoints", "state.checkpoints.dir": "file:///checkpoints", "web.submit.enable": "true", "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter", "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}", "metrics.reporter.promgateway.port": "9091", "metrics.reporter.promgateway.jobName": "ce-test", "metrics.reporter.promgateway.randomJobNameSuffix": "true", "metrics.reporter.promgateway.deleteOnShutdown": "false" } } ] EOF } ```
I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done |
Free forum by Nabble | Edit this page |