Re: Kinesis connector classpath issue when running Flink 1.1-SNAPSHOT on YARN

Posted by Josh on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Kinesis-connector-classpath-issue-when-running-Flink-1-1-SNAPSHOT-on-YARN-tp7611p7622.html

I found that I can still write to s3, using my Flink build of 1.1-SNAPSHOT, for example if I run the word count example:
./bin/flink run ./examples/batch/WordCount.jar --input hdfs:///tmp/LICENSE --output s3://permutive-flink/wordcount-result.txt

This works fine - it's just the RocksDBStateBackend which is erroring with the s3 URI. I'm wondering if it could be an issue with RocksDBStateBackend?
 

On Fri, Jun 17, 2016 at 12:09 PM, Josh <[hidden email]> wrote:
Hi Gordon/Fabian,

Thanks for helping with this! Downgrading the Maven version I was using to build Flink appears to have fixed that problem - I was using Maven 3.3.3 before and have downgraded to 3.2.5. 

Just for reference, I printed the loaded class at runtime and found that when I was using Flink built with Maven 3.3.3, it was pulling in:
jar:file:/opt/flink/flink-1.1-SNAPSHOT/lib/flink-dist_2.11-1.1-SNAPSHOT.jar!/org/apache/http/params/HttpConnectionParams.class
But after building with the older Maven version, it pulled in the class from my jar:
jar:file:/tmp/my-assembly-1.0.jar!/org/apache/http/params/HttpConnectionParams.class


Unfortunately now that problem is fixed I've now got a different classpath issue. It started with:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:175)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:144)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:205)

This is strange because I used an s3:// checkpoint directory when running Flink 1.0.3 on EMR and it worked fine. (according to https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#provide-s3-filesystem-dependency no configuration should be needed to use S3 when running on EMR).

Anyway I tried executing /etc/hadoop/conf/hadoop-env.sh before running my job, as this sets up the HADOOP_CLASSPATH env var. The exception then changed to:
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/common/Abortable

I found that this class is related to a jar called s3-dist-cp, so then I tried copying that jar to Flink's lib directory from /usr/share/aws/emr/s3-dist-cp/lib/*

And now I'm back to another Kinesis connector classpath error:

java.lang.NoClassDefFoundError: org/apache/http/conn/ssl/SSLSocketFactory
at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:136)
at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:120)
at com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:157)
at com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:137)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:76)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:166)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:140)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:123)

I guess this is related to me adding a bunch of extra stuff to the classpath in an attempt to solve the EmrFileSystem error. Any ideas what caused that error in the first place?

By the way, I built Flink with:
mvn clean install -Pinclude-kinesis,vendor-repos -DskipTests -Dhadoop.version=2.7.1

Josh

On Fri, Jun 17, 2016 at 9:56 AM, Fabian Hueske <[hidden email]> wrote:
Hi Josh,

I assume that you build the SNAPSHOT version yourself. I had similar version conflicts for Apache HttpCore with Flink SNAPSHOT versions on EMR.
The problem is cause by a changed behavior in Maven 3.3 and following versions.
Due to these changes, the dependency shading is not working correctly. That's why we use Maven 3.2 to build the Flink release artifacts.

Can you check whether you used Maven 3.3 and try to downgrade to 3.2 if that was the case?

Cheers, Fabian

2016-06-17 8:12 GMT+02:00 Tai Gordon <[hidden email]>:
Hi Josh,

I’m looking into the problem. Seems like the connector is somehow using older versions of httpclient.
Can you print the loaded class path at runtime, and check the path & version of the loaded httpclient / httpcore dependency?
i.e. `classOf[HttpConnectionParams].getResource("HttpConnectionParams.class").toString`

Also, on which commit was your kinesis connector built?

Regards,
Gordon


On June 17, 2016 at 1:08:37 AM, Josh ([hidden email]) wrote:

Hey,

I've been running the Kinesis connector successfully now for a couple of weeks, on a Flink cluster running Flink 1.0.3 on EMR 2.7.1/YARN.

Today I've been trying to get it working on a cluster running the current Flink master (1.1-SNAPSHOT) but am running into a classpath issue when starting the job. This only happens when running on EMR/YARN (it's fine when running 1.1-SNAPSHOT locally, and when running 1.0.3 on EMR)

----
 The program finished with the following exception:

java.lang.NoSuchMethodError: org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V
at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
at com.amazonaws.http.AmazonHttpClient.<init>(AmazonHttpClient.java:187)
at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:136)
at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:120)
at com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:157)
at com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:137)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:76)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:166)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:140)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:123)
---

Any ideas what's going on? 

The job I'm deploying has httpclient 4.3.6 and httpcore 4.3.3 which I believe are the libraries with the HttpConnectionParams class.

Thanks,
Josh