Hi,
I am using elasticsearch 5.4.3 version in my flink project(flink version 1.3.1) Details 1. Using Maven build tool. 2. Running from intellij IDE. 3. Elasticsearch is running on the local machine. Have added the following maven dependency <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.10</artifactId> <version>1.3.1</version> </dependency> *code added* Map<String, String> config = new HashMap<>(); config.put("cluster.name", "elasticsearch"); config.put("bulk.flush.max.actions", "1"); List<InetSocketAddress> transportAddresses = new ArrayList<>(); transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); alerts.addSink(new ElasticsearchSink<AggResult>(config, transportAddresses, new ElasticsearchSinkFunction<AggResult>() { public IndexRequest createIndexRequest(AggResult aggResult){ Map<String, Long> json = new HashMap<>(); json.put("totalCount", aggResult.getTotalCount()); return Requests .indexRequest() .index("logdata") .type("consolidatedStreamData") .source(json); } @Override public void process(AggResult aggResult, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(aggResult)); } })); *This results in the following error.* Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor; at org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52) at ECSPrototype$2.process(ECSPrototype.java:148) at ECSPrototype$2.process(ECSPrototype.java:134) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:327) at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:303) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Anyidea what is wrong here ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I also had problems with ES 5.4.3 and I had to modify the connector code...I fear that the code is compatible only up to ES 5.2 or similar.. On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <[hidden email]> wrote: Hi, Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 1823908 |
That's correct Flavio. The issue has been reported as https://issues.apache.org/jira/browse/FLINK-73862017-08-30 9:21 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |