Local Cluster have problem with connect to elasticsearch

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Local Cluster have problem with connect to elasticsearch

rafal green
Dear Sir or Madam,

Can you tell me why I have a problem with elasticsearch in local cluster?


My flink and elasticsearch config are default (only I change node.name to "node-1")

This example run on my IntelliJIdea 15 but on local cluster I have a problem. Of course WordCount and SocketTextStreamWordCount works fine.


I spend 2 days to try find solution (With uncle google ;) ) but It's not easy 

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")

val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))



Error output:

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

05/08/2016 22:57:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/08/2016 22:57:02 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)


Best regards,
Rafal Greeny
Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

Stephan Ewen
ElasticSearch is basically saying that it cannot connect.

Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?


On Mon, May 9, 2016 at 7:05 PM, rafal green <[hidden email]> wrote:
Dear Sir or Madam,

Can you tell me why I have a problem with elasticsearch in local cluster?


My flink and elasticsearch config are default (only I change node.name to "node-1")

This example run on my IntelliJIdea 15 but on local cluster I have a problem. Of course WordCount and SocketTextStreamWordCount works fine.


I spend 2 days to try find solution (With uncle google ;) ) but It's not easy 

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")

val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))



Error output:

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

05/08/2016 22:57:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/08/2016 22:57:02 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)


Best regards,
Rafal Greeny

Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

Stephan Ewen
Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?

On Wed, May 11, 2016 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
ElasticSearch is basically saying that it cannot connect.

Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?


On Mon, May 9, 2016 at 7:05 PM, rafal green <[hidden email]> wrote:
Dear Sir or Madam,

Can you tell me why I have a problem with elasticsearch in local cluster?


My flink and elasticsearch config are default (only I change node.name to "node-1")

This example run on my IntelliJIdea 15 but on local cluster I have a problem. Of course WordCount and SocketTextStreamWordCount works fine.


I spend 2 days to try find solution (With uncle google ;) ) but It's not easy 

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")

val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))



Error output:

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

05/08/2016 22:57:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/08/2016 22:57:02 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)


Best regards,
Rafal Greeny


Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

Martin Neumann
Hi,

Are you sure the elastic cluster is running correctly? 

Open a browser and try 127.0.0.1:9200 that should give you the overview of the cluster. If you don't get it there is something wrong with the setup. Its also a good way to double check the cluster.name (I got that wrong more than once)

I used to have some connection problems with older elastic versions (don't remember which one). I was able to get around it by retrying multiple times.

cheers Martin

On Wed, May 11, 2016 at 7:43 PM, Stephan Ewen <[hidden email]> wrote:
Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?

On Wed, May 11, 2016 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
ElasticSearch is basically saying that it cannot connect.

Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?


On Mon, May 9, 2016 at 7:05 PM, rafal green <[hidden email]> wrote:
Dear Sir or Madam,

Can you tell me why I have a problem with elasticsearch in local cluster?


My flink and elasticsearch config are default (only I change node.name to "node-1")

This example run on my IntelliJIdea 15 but on local cluster I have a problem. Of course WordCount and SocketTextStreamWordCount works fine.


I spend 2 days to try find solution (With uncle google ;) ) but It's not easy 

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")

val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))



Error output:

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

05/08/2016 22:57:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/08/2016 22:57:02 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)


Best regards,
Rafal Greeny



Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

rafal green
Thanks a lot for many answer :)

Last time I write this email cause I don't understand what is the difference between LOCAL cluster (one node) and IntelliJ IDEA. Now I know :P


If you read this "Linking with modules not contained in the binary distribution" and check pom in flink directory - that doesn't have  this:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>
So flink doesn't contain CONNECTORS (Twitter, Elasticsearch, etc) in the binary distribution. Now I understand and I add this to my pom


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>

</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
Of course I copy created jar to "flink/build-target/lib"

And when I run job in local cluster I saw tweets without any erros. Half of my problem was resolved.

2) Connect to Elasticsearch - I did this same like in twitter connector but I still have problem.

My intelliJ and local cluster is in the same machine so:


"Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?" - YES CAUSE IT'S ON MY NOTEBOOK

"Are you sure the elastic cluster is running correctly?" - YES CAUSE I SEE INDEXES, NODE and CLUSTER NAME when i run http://127.0.0.1:9200/_cat/   or http://127.0.0.1:9200/   etc

"Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?" - NO CAUSE I RUN THIS IN MY INTELLIJ IDEA AND I DONT HAVE PROBLEM TO CONNECT TO ELASTICSEACH 2.3.2 ;)


My poms with elasticseach connector:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>

</artifactItems>
</configuration>
</execution>
</executions>
</plugin>


And when I copy jar to "flink/build-target/lib" I don't have problem with "not connected to any elasticsearch node" but I have problem with Noclassfound org.elasticsearch.Client
ok so I copy next jar from mvn repository and I have next problem with joda Time :D

and the next and the next :D 

And it's not work :(   



So how it's correct way to connect to elasticsearch 2.3.2 in local cluster?



2016-05-11 20:35 GMT+02:00 Martin Neumann <[hidden email]>:
Hi,

Are you sure the elastic cluster is running correctly? 

Open a browser and try 127.0.0.1:9200 that should give you the overview of the cluster. If you don't get it there is something wrong with the setup. Its also a good way to double check the cluster.name (I got that wrong more than once)

I used to have some connection problems with older elastic versions (don't remember which one). I was able to get around it by retrying multiple times.

cheers Martin

On Wed, May 11, 2016 at 7:43 PM, Stephan Ewen <[hidden email]> wrote:
Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?

On Wed, May 11, 2016 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
ElasticSearch is basically saying that it cannot connect.

Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?


On Mon, May 9, 2016 at 7:05 PM, rafal green <[hidden email]> wrote:
Dear Sir or Madam,

Can you tell me why I have a problem with elasticsearch in local cluster?


My flink and elasticsearch config are default (only I change node.name to "node-1")

This example run on my IntelliJIdea 15 but on local cluster I have a problem. Of course WordCount and SocketTextStreamWordCount works fine.


I spend 2 days to try find solution (With uncle google ;) ) but It's not easy 

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2")

val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))



Error output:

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

05/08/2016 22:57:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/08/2016 22:57:02 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)


Best regards,
Rafal Greeny




Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

Tzu-Li Tai
Hi Rafal,

From your description, it seems like Flink is complaining because it cannot access the Elasticsearch API related dependencies as well. You'd also have to include the following into your Maven build, under <artifactItems>:

<artifactItem>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.3.2</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/elasticsearch/**</includes>
</artifactItem>

Now your built jar should correctly include all required dependencies (the connector & Elasticsearch API).

As explained in Linking with modules not contained in the binary distribution, it will be enough to package dependencies along with your code for Flink to access all required dependencies, and you wouldn't need to copy the jar to the lib folder. I would recommend to clean up the lib folder of the previous jars you copied, and follow this approach in the future, just in case they mess up the classloader.

As with your first attempt that Flink cannot find any Elasticsearch nodes when executed in the IDE, I'm suspecting the reason is that the elasticsearch2 connector by default uses version 2.2.1, lower than your cluster version 2.3.2. I had previous experience when Elasticsearch strangely complains not finding any nodes when using lower client versions than the deployment. Can you try compiling the elasticsearch2 connector with the option -Delasticsearch.version=2.3.2, and use the newly build connector jar, following the same method mentioned above?

Hope this helps!

Cheers,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

rafal green
Hi Gordon,

Thanks for advice - it's work perfect but only in elasticsearch case.

This pom version works for elasticsearch 2.2.1. 
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/elasticsearch/**</includes>
</artifactItem>


Why 2.2.1 ? Beacuse if you check the "flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"  you will see this line "<elasticsearch.version>2.2.1</elasticsearch.version>"


But Gordon your idea not working with twitter-connector. and I try add  this: (to pom) and it's not working

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>


or that

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>joauth</artifactId>
<version>6.0.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.4</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/google/guava/**</includes>
</artifactItem>


And if I run job I see this error:

2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins                                     - [node-1] modules [], plugins [], sites []
2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639
2016-05-12 21:49:38,109 INFO  org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89
2016-05-12 21:49:38,114 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Initializing Twitter Streaming API connection
2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient                        - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json
2016-05-12 21:49:38,357 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Twitter Streaming API connection established successfully
2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Uncaught exception
java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
	at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
	at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
	at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Shutting down httpclient connection manager



 ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I add jar to this location: flink/build-target/lib/   - it's working. No idea why :P


2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <[hidden email]>:
Hi Rafal,

From your description, it seems like Flink is complaining because it cannot
access the Elasticsearch API related dependencies as well. You'd also have
to include the following into your Maven build, under <artifactItems>:

<artifactItem>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.3.2</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/elasticsearch/**</includes>
</artifactItem>

Now your built jar should correctly include all required dependencies (the
connector & Elasticsearch API).

As explained in  Linking with modules not contained in the binary
distribution
<https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution>
, it will be enough to package dependencies along with your code for Flink
to access all required dependencies, and you wouldn't need to copy the jar
to the lib folder. I would recommend to clean up the lib folder of the
previous jars you copied, and follow this approach in the future, just in
case they mess up the classloader.

As with your first attempt that Flink cannot find any Elasticsearch nodes
when executed in the IDE, I'm suspecting the reason is that the
elasticsearch2 connector by default uses version 2.2.1, lower than your
cluster version 2.3.2. I had previous experience when Elasticsearch
strangely complains not finding any nodes when using lower client versions
than the deployment. Can you try compiling the elasticsearch2 connector with
the option -Delasticsearch.version=2.3.2, and use the newly build connector
jar, following the same method mentioned above?

Hope this helps!

Cheers,
Gordon



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

rafal green
This is my working jar that i download it form .m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT

2016-05-12 22:26 GMT+02:00 rafal green <[hidden email]>:
Hi Gordon,

Thanks for advice - it's work perfect but only in elasticsearch case.

This pom version works for elasticsearch 2.2.1. 
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/elasticsearch/**</includes>
</artifactItem>


Why 2.2.1 ? Beacuse if you check the "flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"  you will see this line "<elasticsearch.version>2.2.1</elasticsearch.version>"


But Gordon your idea not working with twitter-connector. and I try add  this: (to pom) and it's not working

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>


or that

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>joauth</artifactId>
<version>6.0.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.4</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/google/guava/**</includes>
</artifactItem>


And if I run job I see this error:

2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins                                     - [node-1] modules [], plugins [], sites []
2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639
2016-05-12 21:49:38,109 INFO  org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89
2016-05-12 21:49:38,114 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Initializing Twitter Streaming API connection
2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient                        - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json
2016-05-12 21:49:38,357 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Twitter Streaming API connection established successfully
2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Uncaught exception
java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
	at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
	at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
	at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Shutting down httpclient connection manager



 ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I add jar to this location: flink/build-target/lib/   - it's working. No idea why :P


2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <[hidden email]>:
Hi Rafal,

From your description, it seems like Flink is complaining because it cannot
access the Elasticsearch API related dependencies as well. You'd also have
to include the following into your Maven build, under <artifactItems>:

<artifactItem>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.3.2</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/elasticsearch/**</includes>
</artifactItem>

Now your built jar should correctly include all required dependencies (the
connector & Elasticsearch API).

As explained in  Linking with modules not contained in the binary
distribution
<https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution>
, it will be enough to package dependencies along with your code for Flink
to access all required dependencies, and you wouldn't need to copy the jar
to the lib folder. I would recommend to clean up the lib folder of the
previous jars you copied, and follow this approach in the future, just in
case they mess up the classloader.

As with your first attempt that Flink cannot find any Elasticsearch nodes
when executed in the IDE, I'm suspecting the reason is that the
elasticsearch2 connector by default uses version 2.2.1, lower than your
cluster version 2.3.2. I had previous experience when Elasticsearch
strangely complains not finding any nodes when using lower client versions
than the deployment. Can you try compiling the elasticsearch2 connector with
the option -Delasticsearch.version=2.3.2, and use the newly build connector
jar, following the same method mentioned above?

Hope this helps!

Cheers,
Gordon



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



flink-connector-elasticsearch2_2.11-1.1-SNAPSHOT.jar.zip (3M) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

rafal green
Sorry not jar from elasticsearch-connector but from twitter-connector   ".m2/org/apache/flink/flink-connector-twitter_2.11/1.1-SNAPSHOT" - it's work fine

2016-05-12 22:35 GMT+02:00 rafal green <[hidden email]>:
This is my working jar that i download it form .m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT

2016-05-12 22:26 GMT+02:00 rafal green <[hidden email]>:
Hi Gordon,

Thanks for advice - it's work perfect but only in elasticsearch case.

This pom version works for elasticsearch 2.2.1. 
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/elasticsearch/**</includes>
</artifactItem>


Why 2.2.1 ? Beacuse if you check the "flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"  you will see this line "<elasticsearch.version>2.2.1</elasticsearch.version>"


But Gordon your idea not working with twitter-connector. and I try add  this: (to pom) and it's not working

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>


or that

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>joauth</artifactId>
<version>6.0.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.4</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/google/guava/**</includes>
</artifactItem>


And if I run job I see this error:

2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins                                     - [node-1] modules [], plugins [], sites []
2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639
2016-05-12 21:49:38,109 INFO  org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89
2016-05-12 21:49:38,114 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Initializing Twitter Streaming API connection
2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient                        - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json
2016-05-12 21:49:38,357 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Twitter Streaming API connection established successfully
2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Uncaught exception
java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
	at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
	at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
	at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Shutting down httpclient connection manager



 ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I add jar to this location: flink/build-target/lib/   - it's working. No idea why :P


2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <[hidden email]>:
Hi Rafal,

From your description, it seems like Flink is complaining because it cannot
access the Elasticsearch API related dependencies as well. You'd also have
to include the following into your Maven build, under <artifactItems>:

<artifactItem>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.3.2</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/elasticsearch/**</includes>
</artifactItem>

Now your built jar should correctly include all required dependencies (the
connector & Elasticsearch API).

As explained in  Linking with modules not contained in the binary
distribution
<https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution>
, it will be enough to package dependencies along with your code for Flink
to access all required dependencies, and you wouldn't need to copy the jar
to the lib folder. I would recommend to clean up the lib folder of the
previous jars you copied, and follow this approach in the future, just in
case they mess up the classloader.

As with your first attempt that Flink cannot find any Elasticsearch nodes
when executed in the IDE, I'm suspecting the reason is that the
elasticsearch2 connector by default uses version 2.2.1, lower than your
cluster version 2.3.2. I had previous experience when Elasticsearch
strangely complains not finding any nodes when using lower client versions
than the deployment. Can you try compiling the elasticsearch2 connector with
the option -Delasticsearch.version=2.3.2, and use the newly build connector
jar, following the same method mentioned above?

Hope this helps!

Cheers,
Gordon



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




flink-connector-twitter_2.11-1.1-SNAPSHOT.jar.zip (4M) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Local Cluster have problem with connect to elasticsearch

rafal green
...btw I found this (in folder: "flink/flink-streaming-connectors/flink-connector-elasticsearch2.pom.xml") :  

<!-- Allow users to pass custom connector versions -->
<properties>
<elasticsearch.version>2.2.1</elasticsearch.version>
</properties>

I change it to 2.3.2 version and of course rebuild with that command "mvn clean install -DskipTests" 

...but nothing is changed.


2016-05-12 22:39 GMT+02:00 rafal green <[hidden email]>:
Sorry not jar from elasticsearch-connector but from twitter-connector   ".m2/org/apache/flink/flink-connector-twitter_2.11/1.1-SNAPSHOT" - it's work fine

2016-05-12 22:35 GMT+02:00 rafal green <[hidden email]>:
This is my working jar that i download it form .m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT

2016-05-12 22:26 GMT+02:00 rafal green <[hidden email]>:
Hi Gordon,

Thanks for advice - it's work perfect but only in elasticsearch case.

This pom version works for elasticsearch 2.2.1. 
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.2.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/elasticsearch/**</includes>
</artifactItem>


Why 2.2.1 ? Beacuse if you check the "flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"  you will see this line "<elasticsearch.version>2.2.1</elasticsearch.version>"


But Gordon your idea not working with twitter-connector. and I try add  this: (to pom) and it's not working

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>


or that

<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.version}</artifactId>
<version>1.1-SNAPSHOT</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.twitter</groupId>
<artifactId>joauth</artifactId>
<version>6.0.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.4</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/google/guava/**</includes>
</artifactItem>


And if I run job I see this error:

2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins                                     - [node-1] modules [], plugins [], sites []
2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639
2016-05-12 21:49:38,109 INFO  org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89
2016-05-12 21:49:38,114 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Initializing Twitter Streaming API connection
2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient                        - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json
2016-05-12 21:49:38,357 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Twitter Streaming API connection established successfully
2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Uncaught exception
java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
	at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
	at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
	at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase                         - flink-twitter-source Shutting down httpclient connection manager



 ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I add jar to this location: flink/build-target/lib/   - it's working. No idea why :P


2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <[hidden email]>:
Hi Rafal,

From your description, it seems like Flink is complaining because it cannot
access the Elasticsearch API related dependencies as well. You'd also have
to include the following into your Maven build, under <artifactItems>:

<artifactItem>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.3.2</version>
    <type>jar</type>
    <overWrite>false</overWrite>
    <outputDirectory>${project.build.directory}/classes</outputDirectory>
    <includes>org/elasticsearch/**</includes>
</artifactItem>

Now your built jar should correctly include all required dependencies (the
connector & Elasticsearch API).

As explained in  Linking with modules not contained in the binary
distribution
<https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution>
, it will be enough to package dependencies along with your code for Flink
to access all required dependencies, and you wouldn't need to copy the jar
to the lib folder. I would recommend to clean up the lib folder of the
previous jars you copied, and follow this approach in the future, just in
case they mess up the classloader.

As with your first attempt that Flink cannot find any Elasticsearch nodes
when executed in the IDE, I'm suspecting the reason is that the
elasticsearch2 connector by default uses version 2.2.1, lower than your
cluster version 2.3.2. I had previous experience when Elasticsearch
strangely complains not finding any nodes when using lower client versions
than the deployment. Can you try compiling the elasticsearch2 connector with
the option -Delasticsearch.version=2.3.2, and use the newly build connector
jar, following the same method mentioned above?

Hope this helps!

Cheers,
Gordon



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.