Hi,
I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script: #!/bin/env bash
export FLINK_CONF_DIR=./conf export HADOOP_CLASSPATH=`hadoop classpath` $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf I tried Application Mode [1] using the exact same artifact with the following script: #!/bin/env bash export FLINK_CONF_DIR=./conf export HADOOP_CLASSPATH=`hadoop classpath` $FLINK_HOME/bin/flink run-application -t yarn-application \ -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \ -Dyarn.ship-files=myconf.conf \ hdfs:///jars/myjar.jar myconf.conf but the job fails with the following exception 2020-12-16 15:52:25,364 WARN org.apache.flink.runtime.taskmanager.Task [] - session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to FAILED. org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[stream-calculator-0.1-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[stream-calculator-0.1-SNAPSHOT.jar:?] ... 23 more I have flink-connector-kafka_2.11 in my artifact and don't have it under flink lib directory at all. Thanks in advance, p.s. the attached is the detailed log message from a TM Dongwon taskmanager_container_1600163418174_0105_01_000003_log.txt (85K) Download Attachment |
I just added the following option to the script: -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization Now it seems to work. Why do the application mode and the per-job cluster mode behave differently when it comes to the classloading? Is it a bug? or intended? Best, Dongwon On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <[hidden email]> wrote:
|
Robert, But if Kafka is really only available in the user jar, then this error still should not occur. I think so too; it should not occur. I scan through all the jar files in the classpath using `jar tf` but no jar contains org.apache.kafka.common.serialization.Deserializer with a different version. In your case it seems that the classes are loaded from different classloaders. Hmm, why did the artifact work fine with per-job cluster mode? p.s. Another user seems to face the same problem: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812 Thanks, Dongwon On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <[hidden email]> wrote:
|
Hi Dongwon, to submit the job. So the user jar will be added to distributed cache. When deploying a task to TaskManager, it will be downloaded again and run in user classloader even though we already have it in the system classpath. I think it might be the reason why these classes are loaded by different classloaders. For per-job mode, we are recovering the job and the user jars will not be added to distributed cache. Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your command and have a try? After that, we will disable the user jars including in the system classpath. Best, Yang Dongwon Kim <[hidden email]> 于2020年12月16日周三 下午4:20写道:
|
Hi Yang, Thanks for the detailed explanation! Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your command and have a try? After that, we I tried the following as you suggested: #!/bin/env bash export FLINK_CONF_DIR=./conf export HADOOP_CLASSPATH=`hadoop classpath` $FLINK_HOME/bin/flink run-application -t yarn-application \ -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \ -Dyarn.ship-files=myconf.conf \ -Dyarn.per-job-cluster.include-user-jar=DISABLED \ hdfs:///jars/myjar.jar myconf.conf Unfortunately, this attempt fails with the following exception on TMs: 2020-12-16 18:29:37,859 WARN org.apache.flink.runtime.taskmanager.Task [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6) switched from RUNNING to FAILED. The exception seems to come from another operator, not Kafka, and this operator performs async io using Lettuce, an async Redis client API. Best, Dongwon On Wed, Dec 16, 2020 at 6:07 PM Yang Wang <[hidden email]> wrote:
|
I am not sure about the root cause, but it seems that you could force the default NIO-based transport to work around[1]. Add -Denv.java.opts="-Dcom.datastax.driver.FORCE_NIO=true" to your submission commands. Best, Yang Dongwon Kim <[hidden email]> 于2020年12月16日周三 下午5:37写道:
|
Free forum by Nabble | Edit this page |