Exception on running an Elasticpipe flink connector

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception on running an Elasticpipe flink connector

vipul singh
Hello,

We are working on a Flink ES connector, sourcing from a kafka stream, and sinking data into elasticsearch. The code works fine in intellij, but while running the code on emr(version 5.9, which uses flinkĀ 1.3.2) using flink-yarn-session, we are seeing this exception

Using the parallelism provided by the remote cluster (1). To use another parallelism, set it at the ./bin/flink client.
Starting execution of program
2018-01-02 23:19:16,217 INFO  org.apache.flink.yarn.YarnClusterClient                       - Starting program in interactive mode

------------------------------------------------------------
 The program finished with the following exception:

java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
	at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
	at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)
	at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)
	at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)
	at org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)
	at org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)
	at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)
	at org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451)
	at org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403)
	at org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338)
	at org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179)
	at org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301)
On searching online, it seems like this maybe due to netty version conflicts.
However when we ran a dependency tree on our pom, and we dont see netty coming from anywhere else but flink: https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06

Could you please suggest how can we resolve this error,

Thanks,
Vipul



Reply | Threaded
Open this post in threaded view
|

Re: Exception on running an Elasticpipe flink connector

Nico Kruber
Hi Vipul,
Yes, this looks like a problem with a different netty version being
picked up.

First of all, let me advertise Flink 1.4 for this since there we
properly shade away our netty dependency (on version 4.0.27 atm) so you
(or in this case Elasticsearch) can rely on your required version. Since
you are executing your job inside a Flink cluster, there will be some
more things in the classpath than what your job itself requires, e.g.
core/runtime Flink dependencies and also things from the Hadoop
classpath if present.

Locally in the IDE, only a limited set of libraries are included and the
classpath is set up a bit differently, I suppose. Maybe, you are also
affected by the Maven shading problem for maven >= 3.3 [1][2].
As a workaround, can you try to shade elasticsearch's netty away? See
[3] for details.


Regards
Nico

[1] https://issues.apache.org/jira/browse/FLINK-5013
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html
[3]
https://stackoverflow.com/questions/42962892/flink-with-elasticsearch-5-sink-conflicts-of-io-netty-library

On 04/01/18 07:09, vipul singh wrote:

> Hello,
>
> We are working on a Flink ES connector, sourcing from a kafka stream,
> and sinking data into elasticsearch. The code works fine in intellij,
> but while running the code on emr(version 5.9, which uses flinkĀ 1.3.2)
> using flink-yarn-session, we are seeing this exception
>
>     Using the parallelism provided by the remote cluster (1). To use
>     another parallelism, set it at the ./bin/flink client.
>
>     Starting execution of program
>
>     2018-01-02 23:19:16,217 INFO org.apache.flink.yarn.YarnClusterClient
>     - Starting program in interactive mode
>
>     ------------------------------------------------------------
>
>     The program finished with the following exception:
>
>     java.lang.NoSuchMethodError:
>     io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
>
>     at
>     org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
>
>     at
>     org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)
>
>     at
>     org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)
>
>     at
>     org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)
>
>     at
>     org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)
>
>     at
>     org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)
>
>     at
>     org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)
>
>     at
>     org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451)
>
>     at
>     org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403)
>
>     at
>     org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338)
>
>     at
>     org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179)
>
>     at
>     org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301)
>
> On searching online, it seems like this maybe due to netty version
> conflicts.
> However when we ran a dependency tree on our pom, and we dont see netty
> coming from anywhere else but flink:
> https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06
>
> Could you please suggest how can we resolve this error,
>
> Thanks,
> Vipul
>
>
>


signature.asc (201 bytes) Download Attachment