How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

Jack Tuck

I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.

 

I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job).  Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).

 

Looking at Flink's documentation, it says

> "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution"

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

 

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere.

 

I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.

 

This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath.

```

java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

                at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

                at java.lang.Class.forName0(Native Method)

                at java.lang.Class.forName(Class.java:264)

                at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)

                at java.security.AccessController.doPrivileged(Native Method)

                at javax.security.auth.Subject.doAs(Subject.java:422)

                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

                at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)

                at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)

```

 

EMR resource in terraform

```resource "aws_emr_cluster" "emr_flink" {

  name          = "ce-emr-flink-arn"

  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing

  applications  = ["Flink"]

 

  ec2_attributes {

    key_name                          = "ce_test"

    subnet_id                         = "${aws_subnet.ce_test_subnet_public.id}"

    instance_profile                  = "${aws_iam_instance_profile.emr_profile.arn}"

    emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"

    emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"

    additional_master_security_groups  = "${aws_security_group.external_connectivity.id}"

    additional_slave_security_groups  = "${aws_security_group.external_connectivity.id}"

  }

 

  ebs_root_volume_size = 100

  master_instance_type = "m4.xlarge"

  core_instance_type   = "m4.xlarge"

  core_instance_count  = 2

 

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

 

  configurations_json = <<EOF

[

  {

    "Classification": "flink-conf",

    "Properties": {

        "parallelism.default": "8",

        "state.backend": "RocksDB",

        "state.backend.async": "true",

        "state.backend.incremental": "true",

        "state.savepoints.dir": "file:///savepoints",

        "state.checkpoints.dir": "file:///checkpoints",

        "web.submit.enable": "true",

        "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",

        "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",

        "metrics.reporter.promgateway.port": "9091",

        "metrics.reporter.promgateway.jobName": "ce-test",

        "metrics.reporter.promgateway.randomJobNameSuffix": "true",

        "metrics.reporter.promgateway.deleteOnShutdown": "false"

    }

  }

]

EOF

}

```

 

I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done

Reply | Threaded
Open this post in threaded view
|

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

Jack Tuck

I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.

 

I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job).  Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).

 

Looking at Flink's documentation, it says

> "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution"

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

 

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere.

 

I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.

 

This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath.

```

java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

                at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

                at java.lang.Class.forName0(Native Method)

                at java.lang.Class.forName(Class.java:264)

                at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)

                at java.security.AccessController.doPrivileged(Native Method)

                at javax.security.auth.Subject.doAs(Subject.java:422)

                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

                at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)

                at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)

```

 

EMR resource in terraform

```resource "aws_emr_cluster" "emr_flink" {

  name          = "ce-emr-flink-arn"

  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing

  applications  = ["Flink"]

 

  ec2_attributes {

    key_name                          = "ce_test"

    subnet_id                         = "${aws_subnet.ce_test_subnet_public.id}"

    instance_profile                  = "${aws_iam_instance_profile.emr_profile.arn}"

    emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"

    emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"

    additional_master_security_groups  = "${aws_security_group.external_connectivity.id}"

    additional_slave_security_groups  = "${aws_security_group.external_connectivity.id}"

  }

 

  ebs_root_volume_size = 100

  master_instance_type = "m4.xlarge"

  core_instance_type   = "m4.xlarge"

  core_instance_count  = 2

 

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

 

  configurations_json = <<EOF

[

  {

    "Classification": "flink-conf",

    "Properties": {

        "parallelism.default": "8",

        "state.backend": "RocksDB",

        "state.backend.async": "true",

        "state.backend.incremental": "true",

        "state.savepoints.dir": "file:///savepoints",

        "state.checkpoints.dir": "file:///checkpoints",

        "web.submit.enable": "true",

        "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",

        "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",

        "metrics.reporter.promgateway.port": "9091",

        "metrics.reporter.promgateway.jobName": "ce-test",

        "metrics.reporter.promgateway.randomJobNameSuffix": "true",

        "metrics.reporter.promgateway.deleteOnShutdown": "false"

    }

  }

]

EOF

}

```

 

I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done

Reply | Threaded
Open this post in threaded view
|

Re: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

Yun Tang
In reply to this post by Jack Tuck
Hi Jack

How about extracting flink-metrics-prometheus-1.6.1.jar from downloaded distribution tar https://archive.apache.org/dist/flink/flink-1.6.1/ and upload it to `/usr/lib/flink/lib` on EMR?

Otherwise, I believe setup a customized Flink cluster on EMR [1] should work if no other convenient solutions.



Best
Yun Tang

From: Jack Tuck <[hidden email]>
Sent: Thursday, March 7, 2019 3:39
To: [hidden email]
Subject: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?
 

I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.

 

I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job).  Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).

 

Looking at Flink's documentation, it says

> "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution"

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

 

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere.

 

I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.

 

This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath.

```

java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

                at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

                at java.lang.Class.forName0(Native Method)

                at java.lang.Class.forName(Class.java:264)

                at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)

                at java.security.AccessController.doPrivileged(Native Method)

                at javax.security.auth.Subject.doAs(Subject.java:422)

                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

                at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)

                at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)

```

 

EMR resource in terraform

```resource "aws_emr_cluster" "emr_flink" {

  name          = "ce-emr-flink-arn"

  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing

  applications  = ["Flink"]

 

  ec2_attributes {

    key_name                          = "ce_test"

    subnet_id                         = "${aws_subnet.ce_test_subnet_public.id}"

    instance_profile                  = "${aws_iam_instance_profile.emr_profile.arn}"

    emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"

    emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"

    additional_master_security_groups  = "${aws_security_group.external_connectivity.id}"

    additional_slave_security_groups  = "${aws_security_group.external_connectivity.id}"

  }

 

  ebs_root_volume_size = 100

  master_instance_type = "m4.xlarge"

  core_instance_type   = "m4.xlarge"

  core_instance_count  = 2

 

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

 

  configurations_json = <<EOF

[

  {

    "Classification": "flink-conf",

    "Properties": {

        "parallelism.default": "8",

        "state.backend": "RocksDB",

        "state.backend.async": "true",

        "state.backend.incremental": "true",

        "state.savepoints.dir": "file:///savepoints",

        "state.checkpoints.dir": "file:///checkpoints",

        "web.submit.enable": "true",

        "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",

        "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",

        "metrics.reporter.promgateway.port": "9091",

        "metrics.reporter.promgateway.jobName": "ce-test",

        "metrics.reporter.promgateway.randomJobNameSuffix": "true",

        "metrics.reporter.promgateway.deleteOnShutdown": "false"

    }

  }

]

EOF

}

```

 

I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done