failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Dongwon Kim-2
Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf


I tried Application Mode [1] using the exact same artifact with the following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf


but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more


I have flink-connector-kafka_2.11 in my artifact and don't have it under flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon


taskmanager_container_1600163418174_0105_01_000003_log.txt (85K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Dongwon Kim-2
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization


Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <[hidden email]> wrote:
Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf


I tried Application Mode [1] using the exact same artifact with the following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf


but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more


I have flink-connector-kafka_2.11 in my artifact and don't have it under flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Dongwon Kim-2
Robert,

But if Kafka is really only available in the user jar, then this error still should not occur.
I think so too; it should not occur.
I scan through all the jar files in the classpath using `jar tf` but no jar contains org.apache.kafka.common.serialization.Deserializer with a different version.

In your case it seems that the classes are loaded from different classloaders.
Hmm, why did the artifact work fine with per-job cluster mode?  


Thanks,

Dongwon



On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <[hidden email]> wrote:
Hey Dongwon,

I don't think this is the intended behavior. 
I believe in application mode, we are adding the user jar into the system classloader as well. In your case it seems that the classes are loaded from different classloaders.
But if Kafka is really only available in the user jar, then this error still should not occur.


On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <[hidden email]> wrote:
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization


Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <[hidden email]> wrote:
Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf


I tried Application Mode [1] using the exact same artifact with the following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf


but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more


I have flink-connector-kafka_2.11 in my artifact and don't have it under flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Yang Wang
Hi Dongwon,

For application mode, the job submission happens in the JobManager side. We are using an embedded client
to submit the job. So the user jar will be added to distributed cache. When deploying a task to TaskManager,
it will be downloaded again and run in user classloader even though we already have it in the system classpath.

I think it might be the reason why these classes are loaded by different classloaders.

For per-job mode, we are recovering the job and the user jars will not be added to distributed cache.

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your command and have a try? After that, we
will disable the user jars including in the system classpath.


Best,
Yang



Dongwon Kim <[hidden email]> 于2020年12月16日周三 下午4:20写道:
Robert,

But if Kafka is really only available in the user jar, then this error still should not occur.
I think so too; it should not occur.
I scan through all the jar files in the classpath using `jar tf` but no jar contains org.apache.kafka.common.serialization.Deserializer with a different version.

In your case it seems that the classes are loaded from different classloaders.
Hmm, why did the artifact work fine with per-job cluster mode?  


Thanks,

Dongwon



On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <[hidden email]> wrote:
Hey Dongwon,

I don't think this is the intended behavior. 
I believe in application mode, we are adding the user jar into the system classloader as well. In your case it seems that the classes are loaded from different classloaders.
But if Kafka is really only available in the user jar, then this error still should not occur.


On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <[hidden email]> wrote:
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization


Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <[hidden email]> wrote:
Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf


I tried Application Mode [1] using the exact same artifact with the following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf


but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more


I have flink-connector-kafka_2.11 in my artifact and don't have it under flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Dongwon Kim-2
Hi Yang,

Thanks for the detailed explanation!

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your command and have a try? After that, we
will disable the user jars including in the system classpath.

I tried the following as you suggested:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    -Dyarn.per-job-cluster.include-user-jar=DISABLED \

    hdfs:///jars/myjar.jar myconf.conf


Unfortunately, this attempt fails with the following exception on TMs:
2020-12-16 18:29:37,859 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6) switched from RUNNING to FAILED.
java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connect(RedisClient.java:211) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connect(RedisClient.java:196) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
Suppressed: java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.util.concurrent.EventExecutorGroup
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.resource.DefaultClientResources.shutdown(DefaultClientResources.java:648) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.closeClientResources(AbstractRedisClient.java:569) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.lambda$shutdownAsync$5(AbstractRedisClient.java:521) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) ~[?:1.8.0_222]
at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) ~[?:1.8.0_222]
at io.lettuce.core.AbstractRedisClient.shutdownAsync(AbstractRedisClient.java:521) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:485) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:453) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at com.kakaomobility.drivinghabit.stream.Enricher.close(Enricher.java:60) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

The exception seems to come from another operator, not Kafka, and this operator performs async io using Lettuce, an async Redis client API.

Best,

Dongwon

On Wed, Dec 16, 2020 at 6:07 PM Yang Wang <[hidden email]> wrote:
Hi Dongwon,

For application mode, the job submission happens in the JobManager side. We are using an embedded client
to submit the job. So the user jar will be added to distributed cache. When deploying a task to TaskManager,
it will be downloaded again and run in user classloader even though we already have it in the system classpath.

I think it might be the reason why these classes are loaded by different classloaders.

For per-job mode, we are recovering the job and the user jars will not be added to distributed cache.

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your command and have a try? After that, we
will disable the user jars including in the system classpath.


Best,
Yang



Dongwon Kim <[hidden email]> 于2020年12月16日周三 下午4:20写道:
Robert,

But if Kafka is really only available in the user jar, then this error still should not occur.
I think so too; it should not occur.
I scan through all the jar files in the classpath using `jar tf` but no jar contains org.apache.kafka.common.serialization.Deserializer with a different version.

In your case it seems that the classes are loaded from different classloaders.
Hmm, why did the artifact work fine with per-job cluster mode?  


Thanks,

Dongwon



On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <[hidden email]> wrote:
Hey Dongwon,

I don't think this is the intended behavior. 
I believe in application mode, we are adding the user jar into the system classloader as well. In your case it seems that the classes are loaded from different classloaders.
But if Kafka is really only available in the user jar, then this error still should not occur.


On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <[hidden email]> wrote:
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization


Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <[hidden email]> wrote:
Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf


I tried Application Mode [1] using the exact same artifact with the following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf


but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more


I have flink-connector-kafka_2.11 in my artifact and don't have it under flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

Yang Wang
I am not sure about the root cause, but it seems that you could force the default NIO-based transport to work around[1].
Add -Denv.java.opts="-Dcom.datastax.driver.FORCE_NIO=true" to your submission commands.

Best,
Yang

Dongwon Kim <[hidden email]> 于2020年12月16日周三 下午5:37写道:
Hi Yang,

Thanks for the detailed explanation!

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your command and have a try? After that, we
will disable the user jars including in the system classpath.

I tried the following as you suggested:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    -Dyarn.per-job-cluster.include-user-jar=DISABLED \

    hdfs:///jars/myjar.jar myconf.conf


Unfortunately, this attempt fails with the following exception on TMs:
2020-12-16 18:29:37,859 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - enricher (1/1)#0 (add478e602e93e1720a3d92ebbab5cc6) switched from RUNNING to FAILED.
java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:151) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:89) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.doGetEventExecutor(AbstractRedisClient.java:275) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:264) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:246) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:315) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:278) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connect(RedisClient.java:211) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.RedisClient.connect(RedisClient.java:196) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at com.kakaomobility.drivinghabit.stream.Enricher.open(Enricher.java:55) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
Suppressed: java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.util.concurrent.EventExecutorGroup
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.shutdown(DefaultEventLoopGroupProvider.java:292) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.resource.DefaultClientResources.shutdown(DefaultClientResources.java:648) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.closeClientResources(AbstractRedisClient.java:569) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.lambda$shutdownAsync$5(AbstractRedisClient.java:521) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) ~[?:1.8.0_222]
at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) ~[?:1.8.0_222]
at io.lettuce.core.AbstractRedisClient.shutdownAsync(AbstractRedisClient.java:521) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:485) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at io.lettuce.core.AbstractRedisClient.shutdown(AbstractRedisClient.java:453) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at com.kakaomobility.drivinghabit.stream.Enricher.close(Enricher.java:60) ~[blob_p-ad05abf47895f79601474d976f6b3a3d57c22c87-cb151990c181788b52fc860c990e0c96:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:740) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:720) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:643) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

The exception seems to come from another operator, not Kafka, and this operator performs async io using Lettuce, an async Redis client API.

Best,

Dongwon

On Wed, Dec 16, 2020 at 6:07 PM Yang Wang <[hidden email]> wrote:
Hi Dongwon,

For application mode, the job submission happens in the JobManager side. We are using an embedded client
to submit the job. So the user jar will be added to distributed cache. When deploying a task to TaskManager,
it will be downloaded again and run in user classloader even though we already have it in the system classpath.

I think it might be the reason why these classes are loaded by different classloaders.

For per-job mode, we are recovering the job and the user jars will not be added to distributed cache.

Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your command and have a try? After that, we
will disable the user jars including in the system classpath.


Best,
Yang



Dongwon Kim <[hidden email]> 于2020年12月16日周三 下午4:20写道:
Robert,

But if Kafka is really only available in the user jar, then this error still should not occur.
I think so too; it should not occur.
I scan through all the jar files in the classpath using `jar tf` but no jar contains org.apache.kafka.common.serialization.Deserializer with a different version.

In your case it seems that the classes are loaded from different classloaders.
Hmm, why did the artifact work fine with per-job cluster mode?  


Thanks,

Dongwon



On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <[hidden email]> wrote:
Hey Dongwon,

I don't think this is the intended behavior. 
I believe in application mode, we are adding the user jar into the system classloader as well. In your case it seems that the classes are loaded from different classloaders.
But if Kafka is really only available in the user jar, then this error still should not occur.


On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <[hidden email]> wrote:
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization


Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <[hidden email]> wrote:
Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf


I tried Application Mode [1] using the exact same artifact with the following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins' \

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf


but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more


I have flink-connector-kafka_2.11 in my artifact and don't have it under flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon