unclear exception when writing to elasticsearch

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

unclear exception when writing to elasticsearch

Martin Neumann
Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Tzu-Li (Gordon) Tai
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Martin Neumann
Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Tzu-Li (Gordon) Tai
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your desired version in your project.

You can also check what Elasticsearch client version the project is using by checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon

On March 1, 2017 at 1:21:56 AM, Martin Neumann ([hidden email]) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Martin Neumann
I tried to change the elastic search version to 2.4.1 which results in a new exception:

Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your desired version in your project.

You can also check what Elasticsearch client version the project is using by checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann ([hidden email]) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Flavio Pompermaier
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install -DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <[hidden email]> wrote:
I tried to change the elastic search version to 2.4.1 which results in a new exception:

Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your desired version in your project.

You can also check what Elasticsearch client version the project is using by checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann ([hidden email]) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)




Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Martin Neumann
I created the project using the maven archetype so I'm using the packaged version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), mostly since I don't want to build it and deploy it on the cluster all the time. I tried building it (maven 3.0.5), it builds fine but fails to run on the cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data using HttpURLConnection pushing things to the rest interface. (If that works from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier <[hidden email]> wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install -DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <[hidden email]> wrote:
I tried to change the elastic search version to 2.4.1 which results in a new exception:

Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your desired version in your project.

You can also check what Elasticsearch client version the project is using by checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann ([hidden email]) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)





Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Tzu-Li (Gordon) Tai
Hi Martin,

Just letting you know I’m trying your setup right now, and will get back to you once I confirm the results.

- Gordon


On March 1, 2017 at 9:15:16 PM, Martin Neumann ([hidden email]) wrote:

I created the project using the maven archetype so I'm using the packaged version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), mostly since I don't want to build it and deploy it on the cluster all the time. I tried building it (maven 3.0.5), it builds fine but fails to run on the cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data using HttpURLConnection pushing things to the rest interface. (If that works from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier <[hidden email]> wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install -DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <[hidden email]> wrote:
I tried to change the elastic search version to 2.4.1 which results in a new exception:

Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your desired version in your project.

You can also check what Elasticsearch client version the project is using by checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann ([hidden email]) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)





Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Tzu-Li (Gordon) Tai
Hi Martin,

I followed your setup:

1. Maven java quick start archetype (Flink version 1.1.3)
2. Added `flink-connector-elasticsearch2_2.10` version 1.1.3 dependency
3. Ran the example in the Flink Elasticsearch docs against a Elasticsearch 2.4.1 installation

and everything worked fine.

Just to make sure nothing is conflicting, you could also try to do a `mvn dependency:purge-local-repository` on your project, and then re-download the dependencies with `mvn clean install`, and finally re-importing your project in the IDE.

Let me know if this works for you!

Cheers,
Gordon


On March 1, 2017 at 9:23:35 PM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:

Hi Martin,

Just letting you know I’m trying your setup right now, and will get back to you once I confirm the results.

- Gordon


On March 1, 2017 at 9:15:16 PM, Martin Neumann ([hidden email]) wrote:

I created the project using the maven archetype so I'm using the packaged version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), mostly since I don't want to build it and deploy it on the cluster all the time. I tried building it (maven 3.0.5), it builds fine but fails to run on the cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data using HttpURLConnection pushing things to the rest interface. (If that works from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier <[hidden email]> wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install -DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <[hidden email]> wrote:
I tried to change the elastic search version to 2.4.1 which results in a new exception:

Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your desired version in your project.

You can also check what Elasticsearch client version the project is using by checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann ([hidden email]) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)





Reply | Threaded
Open this post in threaded view
|

Re: unclear exception when writing to elasticsearch

Martin Neumann
Hej,

I finally found out what the problem was. I had added another dependency that was necessary to run things on hops for some reason that broke things. When I remove it, it works fine. I talking to the hops guys about it to understand what's going on. 

Thanks for the help.
Cheers Martin



On Wed, Mar 1, 2017 at 3:14 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Martin,

I followed your setup:

1. Maven java quick start archetype (Flink version 1.1.3)
2. Added `flink-connector-elasticsearch2_2.10` version 1.1.3 dependency
3. Ran the example in the Flink Elasticsearch docs against a Elasticsearch 2.4.1 installation

and everything worked fine.

Just to make sure nothing is conflicting, you could also try to do a `mvn dependency:purge-local-repository` on your project, and then re-download the dependencies with `mvn clean install`, and finally re-importing your project in the IDE.

Let me know if this works for you!

Cheers,
Gordon


On March 1, 2017 at 9:23:35 PM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:

Hi Martin,

Just letting you know I’m trying your setup right now, and will get back to you once I confirm the results.

- Gordon


On March 1, 2017 at 9:15:16 PM, Martin Neumann ([hidden email]) wrote:

I created the project using the maven archetype so I'm using the packaged version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), mostly since I don't want to build it and deploy it on the cluster all the time. I tried building it (maven 3.0.5), it builds fine but fails to run on the cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data using HttpURLConnection pushing things to the rest interface. (If that works from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier <[hidden email]> wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install -DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <[hidden email]> wrote:
I tried to change the elastic search version to 2.4.1 which results in a new exception:

Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your desired version in your project.

You can also check what Elasticsearch client version the project is using by checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann ([hidden email]) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code?

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1.3</version>
</dependency>

One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your code and see if the problem remains?


Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann ([hidden email]) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java example from the website.I doublechecked that I can reach the elastic search from the development machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{
"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)