Re: Local Cluster have problem with connect to elasticsearch

Posted by rafal green on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6837.html

Thanks a lot for many answer :)

Last time I write this email cause I don't understand what is the difference between LOCAL cluster (one node) and IntelliJ IDEA. Now I know :P

https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html  

If you read this "Linking with modules not contained in the binary distribution" and check pom in flink directory - that doesn't have  this:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>
So flink doesn't contain CONNECTORS (Twitter, Elasticsearch, etc) in the binary distribution. Now I understand and I add this to my pom


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>

</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
Of course I copy created jar to "flink/build-target/lib"

And when I run job in local cluster I saw tweets without any erros. Half of my problem was resolved.

2) Connect to Elasticsearch - I did this same like in twitter connector but I still have problem.

My intelliJ and local cluster is in the same machine so:


"Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?" - YES CAUSE IT'S ON MY NOTEBOOK

"Are you sure the elastic cluster is running correctly?" - YES CAUSE I SEE INDEXES, NODE and CLUSTER NAME when i run http://127.0.0.1:9200/_cat/   or http://127.0.0.1:9200/   etc

"Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?" - NO CAUSE I RUN THIS IN MY INTELLIJ IDEA AND I DONT HAVE PROBLEM TO CONNECT TO ELASTICSEACH 2.3.2 ;)


My poms with elasticseach connector:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>

</artifactItems>
</configuration>
</execution>
</executions>
</plugin>


And when I copy jar to "flink/build-target/lib" I don't have problem with "not connected to any elasticsearch node" but I have problem with Noclassfound org.elasticsearch.Client
ok so I copy next jar from mvn repository and I have next problem with joda Time :D

and the next and the next :D 

And it's not work :(   



So how it's correct way to connect to elasticsearch 2.3.2 in local cluster?



2016-05-11 20:35 GMT+02:00 Martin Neumann <[hidden email]>:
Hi,

Are you sure the elastic cluster is running correctly? 

Open a browser and try 127.0.0.1:9200 that should give you the overview of the cluster. If you don't get it there is something wrong with the setup. Its also a good way to double check the cluster.name (I got that wrong more than once)

I used to have some connection problems with older elastic versions (don't remember which one). I was able to get around it by retrying multiple times.

cheers Martin

On Wed, May 11, 2016 at 7:43 PM, Stephan Ewen <[hidden email]> wrote:
Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?

On Wed, May 11, 2016 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
ElasticSearch is basically saying that it cannot connect.

Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?


On Mon, May 9, 2016 at 7:05 PM, rafal green <[hidden email]> wrote:
Dear Sir or Madam,

Can you tell me why I have a problem with elasticsearch in local cluster?


My flink and elasticsearch config are default (only I change node.name to "node-1")

This example run on my IntelliJIdea 15 but on local cluster I have a problem. Of course WordCount and SocketTextStreamWordCount works fine.


I spend 2 days to try find solution (With uncle google ;) ) but It's not easy 

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")

val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))



Error output:

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

05/08/2016 22:57:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/08/2016 22:57:02 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)


Best regards,
Rafal Greeny