Dear Sir or Madam,
Can you tell me why I have a problem with elasticsearch in local cluster? I analysed this example: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html My flink and elasticsearch config are default (only I change node.name to "node-1") This example run on my IntelliJIdea 15 but on local cluster I have a problem. Of course WordCount and SocketTextStreamWordCount works fine. I spend 2 days to try find solution (With uncle google ;) ) but It's not easy val config = new java.util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "elasticsearch") config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2") val transports = new util.ArrayList[InetSocketAddress] transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300)) Error output: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes! at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) 05/08/2016 22:57:02 Job execution switched to status FAILING. java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes! at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) 05/08/2016 22:57:02 Job execution switched to status FAILED. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541) at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69) at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes! at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) Best regards, Rafal Greeny |
ElasticSearch is basically saying that it cannot connect. Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked? On Mon, May 9, 2016 at 7:05 PM, rafal green <[hidden email]> wrote:
|
Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine? On Wed, May 11, 2016 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi, Are you sure the elastic cluster is running correctly? Open a browser and try 127.0.0.1:9200 that should give you the overview of the cluster. If you don't get it there is something wrong with the setup. Its also a good way to double check the cluster.name (I got that wrong more than once) I used to have some connection problems with older elastic versions (don't remember which one). I was able to get around it by retrying multiple times. cheers Martin On Wed, May 11, 2016 at 7:43 PM, Stephan Ewen <[hidden email]> wrote:
|
Thanks a lot for many answer :)
Last time I write this email cause I don't understand what is the difference between LOCAL cluster (one node) and IntelliJ IDEA. Now I know :P If you read this "Linking with modules not contained in the binary distribution" and check pom in flink directory - that doesn't have this:
So flink doesn't contain CONNECTORS (Twitter, Elasticsearch, etc) in the binary distribution. Now I understand and I add this to my pom <plugin> Of course I copy created jar to "flink/build-target/lib" And when I run job in local cluster I saw tweets without any erros. Half of my problem was resolved. 2) Connect to Elasticsearch - I did this same like in twitter connector but I still have problem. My intelliJ and local cluster is in the same machine so: "Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?" - YES CAUSE IT'S ON MY NOTEBOOK "Are you sure the elastic cluster is running correctly?" - YES CAUSE I SEE INDEXES, NODE and CLUSTER NAME when i run http://127.0.0.1:9200/_cat/ or http://127.0.0.1:9200/ etc "Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?" - NO CAUSE I RUN THIS IN MY INTELLIJ IDEA AND I DONT HAVE PROBLEM TO CONNECT TO ELASTICSEACH 2.3.2 ;) My poms with elasticseach connector: <plugin> And when I copy jar to "flink/build-target/lib" I don't have problem with "not connected to any elasticsearch node" but I have problem with Noclassfound org.elasticsearch.Client ok so I copy next jar from mvn repository and I have next problem with joda Time :D and the next and the next :D And it's not work :( So how it's correct way to connect to elasticsearch 2.3.2 in local cluster? 2016-05-11 20:35 GMT+02:00 Martin Neumann <[hidden email]>:
|
Hi Rafal,
From your description, it seems like Flink is complaining because it cannot access the Elasticsearch API related dependencies as well. You'd also have to include the following into your Maven build, under <artifactItems>: <artifactItem> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.3.2</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/elasticsearch/**</includes> </artifactItem> Now your built jar should correctly include all required dependencies (the connector & Elasticsearch API). As explained in Linking with modules not contained in the binary distribution, it will be enough to package dependencies along with your code for Flink to access all required dependencies, and you wouldn't need to copy the jar to the lib folder. I would recommend to clean up the lib folder of the previous jars you copied, and follow this approach in the future, just in case they mess up the classloader. As with your first attempt that Flink cannot find any Elasticsearch nodes when executed in the IDE, I'm suspecting the reason is that the elasticsearch2 connector by default uses version 2.2.1, lower than your cluster version 2.3.2. I had previous experience when Elasticsearch strangely complains not finding any nodes when using lower client versions than the deployment. Can you try compiling the elasticsearch2 connector with the option -Delasticsearch.version=2.3.2, and use the newly build connector jar, following the same method mentioned above? Hope this helps! Cheers, Gordon |
Hi Gordon, Thanks for advice - it's work perfect but only in elasticsearch case. This pom version works for elasticsearch 2.2.1. <artifactItem> Why 2.2.1 ? Beacuse if you check the "flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml" you will see this line "<elasticsearch.version>2.2.1</elasticsearch.version>" But Gordon your idea not working with twitter-connector. and I try add this: (to pom) and it's not working <artifactItem> or that <artifactItem> And if I run job I see this error: 2016-05-12 21:49:37,681 INFO org.elasticsearch.plugins - [node-1] modules [], plugins [], sites [] 2016-05-12 21:49:37,738 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639 2016-05-12 21:49:38,109 INFO org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89 2016-05-12 21:49:38,114 INFO org.apache.flink.streaming.connectors.twitter.TwitterSource - Initializing Twitter Streaming API connection 2016-05-12 21:49:38,357 INFO com.twitter.hbc.httpclient.BasicClient - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json 2016-05-12 21:49:38,357 INFO org.apache.flink.streaming.connectors.twitter.TwitterSource - Twitter Streaming API connection established successfully 2016-05-12 21:49:38,376 WARN com.twitter.hbc.httpclient.ClientBase - flink-twitter-source Uncaught exception java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85) at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56) at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-05-12 21:49:38,379 INFO com.twitter.hbc.httpclient.ClientBase - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V 2016-05-12 21:49:38,380 INFO com.twitter.hbc.httpclient.ClientBase - flink-twitter-source Shutting down httpclient connection manager ... and finaly "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" - if I add jar to this location: flink/build-target/lib/ - it's working. No idea why :P 2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <[hidden email]>: Hi Rafal, |
This is my working jar that i download it form .m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT 2016-05-12 22:26 GMT+02:00 rafal green <[hidden email]>:
flink-connector-elasticsearch2_2.11-1.1-SNAPSHOT.jar.zip (3M) Download Attachment |
Sorry not jar from elasticsearch-connector but from twitter-connector ".m2/org/apache/flink/flink-connector-twitter_2.11/1.1-SNAPSHOT" - it's work fine 2016-05-12 22:35 GMT+02:00 rafal green <[hidden email]>:
flink-connector-twitter_2.11-1.1-SNAPSHOT.jar.zip (4M) Download Attachment |
...btw I found this (in folder: "flink/flink-streaming-connectors/flink-connector-elasticsearch2.pom.xml") : <!-- Allow users to pass custom connector versions --> <properties> <elasticsearch.version>2.2.1</elasticsearch.version> </properties> I change it to 2.3.2 version and of course rebuild with that command "mvn clean install -DskipTests" ...but nothing is changed. 2016-05-12 22:39 GMT+02:00 rafal green <[hidden email]>:
|
Free forum by Nabble | Edit this page |