classpath issue on yarn

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

classpath issue on yarn

aris kol
Hi guys,

I ran into a weird classpath issue while running a streaming job on a yarn cluster.
I have a relatively simple flow that reads data from kafka, does a few manipulations and then indexes them on Elasticsearch (2.3).

I use the elasticsearch2 connector (1.1-SNAPSHOT) (bleeding edge, I know).

The stream works fine in a local flink node (1.0.2) (reading from remote kafka and writing to remote es).
However, when deployed to the remote YARN cluster (again, flink 1.0.2) the following exception is thrown:
```
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(1/8) switched to FAILED 
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

04/26/2016 10:07:30 Job execution switched to status FAILING.
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(7/8) switched to FAILED 
java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
```

I rebuilt the fat jar (I use sbt) many times and in my fat jar there is no trace of the old guava `MoreExecutor` class that doesn't contain the `directExecutor` method the transport client needs.
`lib/flink-dist_2.11-1.0.2.jar` unfortunately contains both: the newest class coming from guava 18 and an old one introduced probably by some ancient hadoop dependency. For some reason the old version takes precedence.

In Spark, I used to configure spark.driver.userClassPathFirst true
and those problems were usually dealt with. Is there anything similar?
Any ideas? 

Thanks,
Aris
Reply | Threaded
Open this post in threaded view
|

Re: classpath issue on yarn

rmetzger0
Hi Aris,

Did you build the 1.0.2 flink-dist yourself?
If not, which exact version did you download?



On Tue, Apr 26, 2016 at 12:28 PM, aris kol <[hidden email]> wrote:
Hi guys,

I ran into a weird classpath issue while running a streaming job on a yarn cluster.
I have a relatively simple flow that reads data from kafka, does a few manipulations and then indexes them on Elasticsearch (2.3).

I use the elasticsearch2 connector (1.1-SNAPSHOT) (bleeding edge, I know).

The stream works fine in a local flink node (1.0.2) (reading from remote kafka and writing to remote es).
However, when deployed to the remote YARN cluster (again, flink 1.0.2) the following exception is thrown:
```
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(1/8) switched to FAILED 
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

04/26/2016 10:07:30 Job execution switched to status FAILING.
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(7/8) switched to FAILED 
java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
```

I rebuilt the fat jar (I use sbt) many times and in my fat jar there is no trace of the old guava `MoreExecutor` class that doesn't contain the `directExecutor` method the transport client needs.
`lib/flink-dist_2.11-1.0.2.jar` unfortunately contains both: the newest class coming from guava 18 and an old one introduced probably by some ancient hadoop dependency. For some reason the old version takes precedence.

In Spark, I used to configure spark.driver.userClassPathFirst true
and those problems were usually dealt with. Is there anything similar?
Any ideas? 

Thanks,
Aris

Reply | Threaded
Open this post in threaded view
|

RE: classpath issue on yarn

aris kol
Hi Robert,

Thank you for your prompt response.
No, I downloaded it from an apache mirror.
I think yarn loads the hadoop universe before the user classpath by default, so I reckon I would get this exception even without flink in the middle.
I can still see both the old and the new MoreExecutors class in flink-dist
 (the old as org/apache/flink/hadoop/shaded/com/google/common/util/concurrent
the new as org/apache/flink/shaded/com/google/common/util/concurrent)
I reckon I should try to shade guava in my side, but the Shade plugin in sbt-assembly is quite fresh.

I will try and report.

Thanks,
Aris




From: [hidden email]
Date: Tue, 26 Apr 2016 18:42:31 +0200
Subject: Re: classpath issue on yarn
To: [hidden email]

Hi Aris,

Did you build the 1.0.2 flink-dist yourself?
If not, which exact version did you download?



On Tue, Apr 26, 2016 at 12:28 PM, aris kol <[hidden email]> wrote:
Hi guys,

I ran into a weird classpath issue while running a streaming job on a yarn cluster.
I have a relatively simple flow that reads data from kafka, does a few manipulations and then indexes them on Elasticsearch (2.3).

I use the elasticsearch2 connector (1.1-SNAPSHOT) (bleeding edge, I know).

The stream works fine in a local flink node (1.0.2) (reading from remote kafka and writing to remote es).
However, when deployed to the remote YARN cluster (again, flink 1.0.2) the following exception is thrown:
```
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(1/8) switched to FAILED 
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

04/26/2016 10:07:30 Job execution switched to status FAILING.
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(7/8) switched to FAILED 
java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
```

I rebuilt the fat jar (I use sbt) many times and in my fat jar there is no trace of the old guava `MoreExecutor` class that doesn't contain the `directExecutor` method the transport client needs.
`lib/flink-dist_2.11-1.0.2.jar` unfortunately contains both: the newest class coming from guava 18 and an old one introduced probably by some ancient hadoop dependency. For some reason the old version takes precedence.

In Spark, I used to configure spark.driver.userClassPathFirst true
and those problems were usually dealt with. Is there anything similar?
Any ideas? 

Thanks,
Aris

Reply | Threaded
Open this post in threaded view
|

RE: classpath issue on yarn

aris kol
So,
I shaded guava.
The whole think works fine locally (stand alone local flink), but on yarn (forgot to mention it runs on EMR), I get the following:
org.apache.flink.client.program.ProgramInvocationException: Unknown I/O error while extracting contained jar files.
at org.apache.flink.client.program.PackagedProgram.extractContainedLibaries(PackagedProgram.java:729)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:192)
at org.apache.flink.client.CliFrontend.buildProgram(CliFrontend.java:922)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:301)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: java.util.zip.ZipException: error in opening zip file
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.<init>(ZipFile.java:219)
at java.util.zip.ZipFile.<init>(ZipFile.java:149)
at java.util.jar.JarFile.<init>(JarFile.java:166)
at java.util.jar.JarFile.<init>(JarFile.java:130)
at org.apache.flink.client.program.PackagedProgram.extractContainedLibaries(PackagedProgram.java:647)
... 5 more
I removed the shaded dependency and I got back to the previous error.
Any clues?
Thanks,
Aris


From: [hidden email]
To: [hidden email]
Subject: RE: classpath issue on yarn
Date: Tue, 26 Apr 2016 21:03:50 +0000

Hi Robert,

Thank you for your prompt response.
No, I downloaded it from an apache mirror.
I think yarn loads the hadoop universe before the user classpath by default, so I reckon I would get this exception even without flink in the middle.
I can still see both the old and the new MoreExecutors class in flink-dist
 (the old as org/apache/flink/hadoop/shaded/com/google/common/util/concurrent
the new as org/apache/flink/shaded/com/google/common/util/concurrent)
I reckon I should try to shade guava in my side, but the Shade plugin in sbt-assembly is quite fresh.

I will try and report.

Thanks,
Aris




From: [hidden email]
Date: Tue, 26 Apr 2016 18:42:31 +0200
Subject: Re: classpath issue on yarn
To: [hidden email]

Hi Aris,

Did you build the 1.0.2 flink-dist yourself?
If not, which exact version did you download?



On Tue, Apr 26, 2016 at 12:28 PM, aris kol <[hidden email]> wrote:
Hi guys,

I ran into a weird classpath issue while running a streaming job on a yarn cluster.
I have a relatively simple flow that reads data from kafka, does a few manipulations and then indexes them on Elasticsearch (2.3).

I use the elasticsearch2 connector (1.1-SNAPSHOT) (bleeding edge, I know).

The stream works fine in a local flink node (1.0.2) (reading from remote kafka and writing to remote es).
However, when deployed to the remote YARN cluster (again, flink 1.0.2) the following exception is thrown:
```
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(1/8) switched to FAILED 
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

04/26/2016 10:07:30 Job execution switched to status FAILING.
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink: Unnamed(7/8) switched to FAILED 
java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
```

I rebuilt the fat jar (I use sbt) many times and in my fat jar there is no trace of the old guava `MoreExecutor` class that doesn't contain the `directExecutor` method the transport client needs.
`lib/flink-dist_2.11-1.0.2.jar` unfortunately contains both: the newest class coming from guava 18 and an old one introduced probably by some ancient hadoop dependency. For some reason the old version takes precedence.

In Spark, I used to configure spark.driver.userClassPathFirst true
and those problems were usually dealt with. Is there anything similar?
Any ideas? 

Thanks,
Aris