Hi,
Last Friday I was running elasticsearch 5.X with Flink 1.2.0 In the pom.xml I added this dependency: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch-base_2.10</artifactId> <version>1.3-SNAPSHOT</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> And I added to two files : the ElasticsearchSink.java and Elasticsearch5ApiCallBridge.java from the flink github. And this code was running with no problem: public static void writeToElastic(DataStream<HashMap<String, Object>> elasticStream) { HashMap<String, String> config = new HashMap<>(); config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "clouduxlogs"); try { ArrayList<InetSocketAddress> transports = new ArrayList<>(); transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300)); ElasticsearchSinkFunction<HashMap<String, Object>> indexLog = new ElasticsearchSinkFunction<HashMap<String, Object>>() { private static final long serialVersionUID = 8802869701292023100L; public IndexRequest createIndexRequest(HashMap<String, Object> element) { HashMap<String, HashMap<String,Object>> valueOfLog = new HashMap<>(); element.put("timestamp", (new Timestamp((new Date()).getTime())).toString()); valueOfLog.put("data", element); //{aggregation : { aggregationType : "value", "value" : 468, "count" : 1, "timestamp": } } return Requests .indexRequest() .index("logs") .type("object") .source(valueOfLog); } public void process(HashMap<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }; SinkFunction<HashMap<String, Object>> esSink = new ElasticsearchSink<HashMap<String, Object>>(config, transports, indexLog); elasticStream.addSink(esSink); } catch (Exception e) { System.out.println(e); } } but in monday those files (ElasticsearchSink.java and Elasticsearch5ApiCallBridge.java) was changed and now my code doesn't work. I have tried to use this dependency: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.10</artifactId> <version>1.3-SNAPSHOT</version> </dependency> but I'm getting this error: java.lang.NoSuchMethodError: org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195) at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95) at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:78) at ux.App.writeToElastic(App.java:102) at ux.App.main(App.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) Do I need to downgrade my elastic version or there is some other way to make it work? Thanks, Fábio Dias. |
Hi,
The reason for this exception is because the `isSerializable `method only exists in 1.3-SNAPSHOT of `flink-core` at the moment. These kind of errors can usually be expected to happen if you are using mismatching versions of Flink libraries and core Flink dependencies. Elasticsearch 5 will be released with Flink 1.3.0 (targeted release time is end of May). For the time being, if Elasticsearch 5 is a must, you could try implementing a copy of the `isSerializable` method under the exact same package path / method and class name in your own project. However, I can not guarantee that this will work as there may be other conflicts. - Gordon On March 2, 2017 at 10:47:52 PM, Fábio Dias ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |