Elasticsearch 5.x connection

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Elasticsearch 5.x connection

Fábio Dias
Hi,

Last Friday I was running elasticsearch 5.X with Flink 1.2.0

In the pom.xml I added this dependency: 

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>

And I added to two files : the ElasticsearchSink.java and Elasticsearch5ApiCallBridge.java from the flink github.

And this code was running with no problem:

public static void writeToElastic(DataStream<HashMap<String, Object>> elasticStream) {

        HashMap<String, String> config = new HashMap<>();

        config.put("bulk.flush.max.actions", "1");
        config.put("cluster.name", "clouduxlogs");

        try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));

            ElasticsearchSinkFunction<HashMap<String, Object>> indexLog = new ElasticsearchSinkFunction<HashMap<String, Object>>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(HashMap<String, Object> element) {
HashMap<String, HashMap<String,Object>> valueOfLog = new HashMap<>();
element.put("timestamp", (new Timestamp((new Date()).getTime())).toString());
valueOfLog.put("data", element);
//{aggregation : { aggregationType : "value", "value" : 468, "count" : 1, "timestamp":  } }

return Requests
.indexRequest()
.index("logs")
.type("object")
.source(valueOfLog);
            }
public void process(HashMap<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

SinkFunction<HashMap<String, Object>> esSink = new ElasticsearchSink<HashMap<String, Object>>(config, transports, indexLog);
            elasticStream.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }
}

but in monday those files (ElasticsearchSink.java and Elasticsearch5ApiCallBridge.java) was changed and now my code doesn't work.

I have tried to use this dependency: 

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>

but I'm getting this error:

java.lang.NoSuchMethodError: org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
        at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195)
        at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95)
        at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:78)
        at ux.App.writeToElastic(App.java:102)
        at ux.App.main(App.java:55)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)

Do I need to downgrade my elastic version or there is some other way to make it work?

Thanks,
Fábio Dias.
Reply | Threaded
Open this post in threaded view
|

Re: Elasticsearch 5.x connection

Tzu-Li (Gordon) Tai
Hi,

java.lang.NoSuchMethodError: org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
        at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195)
        at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95)
        at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(

The reason for this exception is because the `isSerializable `method only exists in 1.3-SNAPSHOT of `flink-core` at the moment. These kind of errors can usually be expected to happen if you are using mismatching versions of Flink libraries and core Flink dependencies.

Elasticsearch 5 will be released with Flink 1.3.0 (targeted release time is end of May). For the time being, if Elasticsearch 5 is a must, you could try implementing a copy of the `isSerializable` method under the exact same package path / method and class name in your own project. However, I can not guarantee that this will work as there may be other conflicts.

- Gordon



On March 2, 2017 at 10:47:52 PM, Fábio Dias ([hidden email]) wrote:

java.lang.NoSuchMethodError: org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
        at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195)
        at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95)
        at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(