This is my working jar that i download it form .m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT2016-05-12 22:26 GMT+02:00 rafal green <[hidden email]>:Hi Gordon,Thanks for advice - it's work perfect but only in elasticsearch case.This pom version works for elasticsearch 2.2.1.<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/elasticsearch/**</includes>
</artifactItem>Why 2.2.1 ? Beacuse if you check the "flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml" you will see this line "<elasticsearch.version>2.2.1</elasticsearch.version>"But Gordon your idea not working with twitter-connector. and I try add this: (to pom) and it's not working<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>or that<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>joauth</artifactId>
<version>6.0.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.4</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/google/guava/**</includes>
</artifactItem>And if I run job I see this error:2016-05-12 21:49:37,681 INFO org.elasticsearch.plugins - [node-1] modules [], plugins [], sites [] 2016-05-12 21:49:37,738 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639 2016-05-12 21:49:38,109 INFO org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89 2016-05-12 21:49:38,114 INFO org.apache.flink.streaming.connectors.twitter.TwitterSource - Initializing Twitter Streaming API connection 2016-05-12 21:49:38,357 INFO com.twitter.hbc.httpclient.BasicClient - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json 2016-05-12 21:49:38,357 INFO org.apache.flink.streaming.connectors.twitter.TwitterSource - Twitter Streaming API connection established successfully 2016-05-12 21:49:38,376 WARN com.twitter.hbc.httpclient.ClientBase - flink-twitter-source Uncaught exception java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85) at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56) at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-05-12 21:49:38,379 INFO com.twitter.hbc.httpclient.ClientBase - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V 2016-05-12 21:49:38,380 INFO com.twitter.hbc.httpclient.ClientBase - flink-twitter-source Shutting down httpclient connection manager... and finaly "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" - if I add jar to this location: flink/build-target/lib/ - it's working. No idea why :P2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <[hidden email]>:Hi Rafal,
From your description, it seems like Flink is complaining because it cannot
access the Elasticsearch API related dependencies as well. You'd also have
to include the following into your Maven build, under <artifactItems>:
<artifactItem>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.3.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/elasticsearch/**</includes>
</artifactItem>
Now your built jar should correctly include all required dependencies (the
connector & Elasticsearch API).
As explained in Linking with modules not contained in the binary
distribution
<https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution>
, it will be enough to package dependencies along with your code for Flink
to access all required dependencies, and you wouldn't need to copy the jar
to the lib folder. I would recommend to clean up the lib folder of the
previous jars you copied, and follow this approach in the future, just in
case they mess up the classloader.
As with your first attempt that Flink cannot find any Elasticsearch nodes
when executed in the IDE, I'm suspecting the reason is that the
elasticsearch2 connector by default uses version 2.2.1, lower than your
cluster version 2.3.2. I had previous experience when Elasticsearch
strangely complains not finding any nodes when using lower client versions
than the deployment. Can you try compiling the elasticsearch2 connector with
the option -Delasticsearch.version=2.3.2, and use the newly build connector
jar, following the same method mentioned above?
Hope this helps!
Cheers,
Gordon
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Free forum by Nabble | Edit this page |