Flink: Exception from container-launch exitCode=2

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink: Exception from container-launch exitCode=2

Yik San Chan
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2. Viewing the question on Stack Overflow is preferred as I include a few images for better description.

Hi community,

## Flink (Scala) exitCode=2

I have a simple Flink job that reads from 2 columns of a Hive table `mysource`, add up the columns, then writes the result to another Hive table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`, and `mysink` has only 1 column `c bigint`.

The job submits successfully, however, I observe it keeps retrying.

[![enter image description here][1]][1]

I click into each attempt, they simply show this.

```
AM Container for appattempt_1607399514900_2511_001267 exited with exitCode: 2
For more detailed output, check application tracking page:http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e13_1607399514900_2511_1267_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 2
Failing this attempt
```

However, the "Logs" has no useful info - it complains about the logging lib, but I believe they are really warnings, not errors.

```
LogType:jobmanager.err
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:1010
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
End of LogType:jobmanager.err

LogType:jobmanager.out
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:0
Log Contents:
End of LogType:jobmanager.out
```

This is the job written in Scala.

```scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveToyExample {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance.build
    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(execEnv, settings)

    val hiveCatalog = new HiveCatalog(
      "myhive",
      "aiinfra",
      "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
    )
    tableEnv.registerCatalog("myhive", hiveCatalog)
    tableEnv.useCatalog("myhive")

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv
      .executeSql("""
          |INSERT INTO mysink
          |SELECT a + b
          |FROM mysource
          |""".stripMargin)
  }
}
```

Here's the pom.xml.

```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>exmple</groupId>
<artifactId>featurepipelines</artifactId>
<version>0.1.1</version>
<packaging>jar</packaging>

<name>Feature Pipelines</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<shadedClassifierName>Shade</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

This is how I package the jar.

```
mvn clean package
```

This is how I run the job.

```
flink run \
--yarnname scalaflink-hive-test \
-m yarn-cluster \
-yarnqueue datadev \
--class featurepipelines.ingestion.HiveToyExample \
./featurepipelines-0.1.1.jar
```

## PyFlink rewrite works just fine?!

Since the logic is so simple, I rewrite the job with PyFlink to see what happens. Here shows the PyFlink rewrite.

```python
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings)

# There exists such a jar in the path
t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

This is how I run the PyFlink job.

```
flink run \
--yarnname pyflink-hive-test \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
```

Surprisingly, the job runs fine - it finishes soon, with result written to the `mysink` table.

## Why?

Given the comparison, I highly doubt the Scala job fails because it is not packaged correctly, even though I follow [Flink Docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven), which can be verified by looking at my pom.

> If you are building your own program, you need the following dependencies in your mvn file. It’s recommended not to include these dependencies in the resulting jar file. You’re supposed to add dependencies as stated above at runtime.

```
<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>
```

Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in /lib of my flink distribution, as suggested in [Flink docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar):

> the recommended way to add dependency is to use a bundled jar. Separate jars should be used only if bundled jars don’t meet your needs.

What do I miss?

Best,
Reply | Threaded
Open this post in threaded view
|

Re: Flink: Exception from container-launch exitCode=2

Till Rohrmann
Hi Yik San,

to me it looks as if there is a problem with the job and the deployment. Unfortunately, the logging seems to not have worked. Could you check that you have a valid log4j.properties file in your conf directory.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2. Viewing the question on Stack Overflow is preferred as I include a few images for better description.

Hi community,

## Flink (Scala) exitCode=2

I have a simple Flink job that reads from 2 columns of a Hive table `mysource`, add up the columns, then writes the result to another Hive table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`, and `mysink` has only 1 column `c bigint`.

The job submits successfully, however, I observe it keeps retrying.

[![enter image description here][1]][1]

I click into each attempt, they simply show this.

```
AM Container for appattempt_1607399514900_2511_001267 exited with exitCode: 2
For more detailed output, check application tracking page:http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e13_1607399514900_2511_1267_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 2
Failing this attempt
```

However, the "Logs" has no useful info - it complains about the logging lib, but I believe they are really warnings, not errors.

```
LogType:jobmanager.err
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:1010
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
End of LogType:jobmanager.err

LogType:jobmanager.out
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:0
Log Contents:
End of LogType:jobmanager.out
```

This is the job written in Scala.

```scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveToyExample {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance.build
    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(execEnv, settings)

    val hiveCatalog = new HiveCatalog(
      "myhive",
      "aiinfra",
      "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
    )
    tableEnv.registerCatalog("myhive", hiveCatalog)
    tableEnv.useCatalog("myhive")

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv
      .executeSql("""
          |INSERT INTO mysink
          |SELECT a + b
          |FROM mysource
          |""".stripMargin)
  }
}
```

Here's the pom.xml.

```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>exmple</groupId>
<artifactId>featurepipelines</artifactId>
<version>0.1.1</version>
<packaging>jar</packaging>

<name>Feature Pipelines</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<shadedClassifierName>Shade</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

This is how I package the jar.

```
mvn clean package
```

This is how I run the job.

```
flink run \
--yarnname scalaflink-hive-test \
-m yarn-cluster \
-yarnqueue datadev \
--class featurepipelines.ingestion.HiveToyExample \
./featurepipelines-0.1.1.jar
```

## PyFlink rewrite works just fine?!

Since the logic is so simple, I rewrite the job with PyFlink to see what happens. Here shows the PyFlink rewrite.

```python
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings)

# There exists such a jar in the path
t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

This is how I run the PyFlink job.

```
flink run \
--yarnname pyflink-hive-test \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
```

Surprisingly, the job runs fine - it finishes soon, with result written to the `mysink` table.

## Why?

Given the comparison, I highly doubt the Scala job fails because it is not packaged correctly, even though I follow [Flink Docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven), which can be verified by looking at my pom.

> If you are building your own program, you need the following dependencies in your mvn file. It’s recommended not to include these dependencies in the resulting jar file. You’re supposed to add dependencies as stated above at runtime.

```
<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>
```

Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in /lib of my flink distribution, as suggested in [Flink docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar):

> the recommended way to add dependency is to use a bundled jar. Separate jars should be used only if bundled jars don’t meet your needs.

What do I miss?

Best,
Reply | Threaded
Open this post in threaded view
|

Re: Flink: Exception from container-launch exitCode=2

Till Rohrmann
I actually think that the logging problem is caused by Hadoop 2.7.3 which pulls in the slf4j-log4j12-1.7.10.jar. This binding is then used but there is no proper configuration file for log4j because Flink actually uses log4j2.

Cheers,
Till

On Fri, Apr 9, 2021 at 12:05 PM Till Rohrmann <[hidden email]> wrote:
Hi Yik San,

to me it looks as if there is a problem with the job and the deployment. Unfortunately, the logging seems to not have worked. Could you check that you have a valid log4j.properties file in your conf directory.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2. Viewing the question on Stack Overflow is preferred as I include a few images for better description.

Hi community,

## Flink (Scala) exitCode=2

I have a simple Flink job that reads from 2 columns of a Hive table `mysource`, add up the columns, then writes the result to another Hive table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`, and `mysink` has only 1 column `c bigint`.

The job submits successfully, however, I observe it keeps retrying.

[![enter image description here][1]][1]

I click into each attempt, they simply show this.

```
AM Container for appattempt_1607399514900_2511_001267 exited with exitCode: 2
For more detailed output, check application tracking page:http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e13_1607399514900_2511_1267_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 2
Failing this attempt
```

However, the "Logs" has no useful info - it complains about the logging lib, but I believe they are really warnings, not errors.

```
LogType:jobmanager.err
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:1010
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
End of LogType:jobmanager.err

LogType:jobmanager.out
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:0
Log Contents:
End of LogType:jobmanager.out
```

This is the job written in Scala.

```scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveToyExample {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance.build
    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(execEnv, settings)

    val hiveCatalog = new HiveCatalog(
      "myhive",
      "aiinfra",
      "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
    )
    tableEnv.registerCatalog("myhive", hiveCatalog)
    tableEnv.useCatalog("myhive")

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv
      .executeSql("""
          |INSERT INTO mysink
          |SELECT a + b
          |FROM mysource
          |""".stripMargin)
  }
}
```

Here's the pom.xml.

```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>exmple</groupId>
<artifactId>featurepipelines</artifactId>
<version>0.1.1</version>
<packaging>jar</packaging>

<name>Feature Pipelines</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<shadedClassifierName>Shade</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

This is how I package the jar.

```
mvn clean package
```

This is how I run the job.

```
flink run \
--yarnname scalaflink-hive-test \
-m yarn-cluster \
-yarnqueue datadev \
--class featurepipelines.ingestion.HiveToyExample \
./featurepipelines-0.1.1.jar
```

## PyFlink rewrite works just fine?!

Since the logic is so simple, I rewrite the job with PyFlink to see what happens. Here shows the PyFlink rewrite.

```python
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings)

# There exists such a jar in the path
t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

This is how I run the PyFlink job.

```
flink run \
--yarnname pyflink-hive-test \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
```

Surprisingly, the job runs fine - it finishes soon, with result written to the `mysink` table.

## Why?

Given the comparison, I highly doubt the Scala job fails because it is not packaged correctly, even though I follow [Flink Docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven), which can be verified by looking at my pom.

> If you are building your own program, you need the following dependencies in your mvn file. It’s recommended not to include these dependencies in the resulting jar file. You’re supposed to add dependencies as stated above at runtime.

```
<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>
```

Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in /lib of my flink distribution, as suggested in [Flink docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar):

> the recommended way to add dependency is to use a bundled jar. Separate jars should be used only if bundled jars don’t meet your needs.

What do I miss?

Best,