The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2. Viewing the question on Stack Overflow is preferred as I include a few images for better description.
Hi community, ## Flink (Scala) exitCode=2 I have a simple Flink job that reads from 2 columns of a Hive table `mysource`, add up the columns, then writes the result to another Hive table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`, and `mysink` has only 1 column `c bigint`. The job submits successfully, however, I observe it keeps retrying. [![enter image description here][1]][1] I click into each attempt, they simply show this. ``` AM Container for appattempt_1607399514900_2511_001267 exited with exitCode: 2 For more detailed output, check application tracking page:http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_e13_1607399514900_2511_1267_000001 Exit code: 2 Stack trace: ExitCodeException exitCode=2: at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Container exited with a non-zero exit code 2 Failing this attempt ``` However, the "Logs" has no useful info - it complains about the logging lib, but I believe they are really warnings, not errors. ``` LogType:jobmanager.err Log Upload Time:Wed Apr 07 10:30:52 +0800 2021 LogLength:1010 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/ SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. End of LogType:jobmanager.err LogType:jobmanager.out Log Upload Time:Wed Apr 07 10:30:52 +0800 2021 LogLength:0 Log Contents: End of LogType:jobmanager.out ``` This is the job written in Scala. ```scala import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog object HiveToyExample { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance.build val execEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(execEnv, settings) val hiveCatalog = new HiveCatalog( "myhive", "aiinfra", "/data/apache/hive/apache-hive-2.1.0-bin/conf/" ) tableEnv.registerCatalog("myhive", hiveCatalog) tableEnv.useCatalog("myhive") tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) tableEnv .executeSql(""" |INSERT INTO mysink |SELECT a + b |FROM mysource |""".stripMargin) } } ``` Here's the pom.xml. ```xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>exmple</groupId> <artifactId>featurepipelines</artifactId> <version>0.1.1</version> <packaging>jar</packaging> <name>Feature Pipelines</name> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.12.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> <log4j.version>2.12.1</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> <shadedClassifierName>Shade</shadedClassifierName> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.4.1</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> ``` This is how I package the jar. ``` mvn clean package ``` This is how I run the job. ``` flink run \ --yarnname scalaflink-hive-test \ -m yarn-cluster \ -yarnqueue datadev \ --class featurepipelines.ingestion.HiveToyExample \ ./featurepipelines-0.1.1.jar ``` ## PyFlink rewrite works just fine?! Since the logic is so simple, I rewrite the job with PyFlink to see what happens. Here shows the PyFlink rewrite. ```python import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import * from pyflink.table.catalog import HiveCatalog settings = EnvironmentSettings.new_instance().use_blink_planner().build() exec_env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings) # There exists such a jar in the path t_env.get_config().get_configuration().set_string( "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar" ) catalog_name = "myhive" default_database = "aiinfra" hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/" hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir) t_env.register_catalog(catalog_name, hive_catalog) t_env.use_catalog(catalog_name) TRANSFORM_DML = """ INSERT INTO mysink SELECT a + b FROM mysource """ t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) t_env.execute_sql(TRANSFORM_DML).wait() ``` This is how I run the PyFlink job. ``` flink run \ --yarnname pyflink-hive-test \ -m yarn-cluster \ -yD yarn.application.queue=tech_platform \ -pyarch pyflink1.12.0.zip \ -pyexec /data/software/pyflink1.12.0/bin/python \ -py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py ``` Surprisingly, the job runs fine - it finishes soon, with result written to the `mysink` table. ## Why? Given the comparison, I highly doubt the Scala job fails because it is not packaged correctly, even though I follow [Flink Docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven), which can be verified by looking at my pom. > If you are building your own program, you need the following dependencies in your mvn file. It’s recommended not to include these dependencies in the resulting jar file. You’re supposed to add dependencies as stated above at runtime. ``` <!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>1.12.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.12.0</version> <scope>provided</scope> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency> ``` Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in /lib of my flink distribution, as suggested in [Flink docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar): > the recommended way to add dependency is to use a bundled jar. Separate jars should be used only if bundled jars don’t meet your needs. What do I miss? Best, |
Hi Yik San, to me it looks as if there is a problem with the job and the deployment. Unfortunately, the logging seems to not have worked. Could you check that you have a valid log4j.properties file in your conf directory. Cheers, Till On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <[hidden email]> wrote:
|
I actually think that the logging problem is caused by Hadoop 2.7.3 which pulls in the slf4j-log4j12-1.7.10.jar. This binding is then used but there is no proper configuration file for log4j because Flink actually uses log4j2. Cheers, Till On Fri, Apr 9, 2021 at 12:05 PM Till Rohrmann <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |