Elasticsearch Sink - Error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Elasticsearch Sink - Error

Raj Kumar
Hi,
I am using elasticsearch 5.4.3 version in my flink project(flink version
1.3.1)
Details
1. Using Maven build tool.
2. Running from intellij IDE.
3. Elasticsearch is running on the local machine.

Have added the following maven dependency

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
    <version>1.3.1</version>
</dependency>


*code added*

Map<String, String> config = new HashMap<>();
            config.put("cluster.name", "elasticsearch");
            config.put("bulk.flush.max.actions", "1");

            List<InetSocketAddress> transportAddresses = new ArrayList<>();
            transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));

            alerts.addSink(new ElasticsearchSink<AggResult>(config,
transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
                public IndexRequest createIndexRequest(AggResult aggResult){
                    Map<String, Long> json = new HashMap<>();
                    json.put("totalCount", aggResult.getTotalCount());

                    return Requests
                            .indexRequest()
                            .index("logdata")
                            .type("consolidatedStreamData")
                            .source(json);

                }
                @Override
                public void process(AggResult aggResult, RuntimeContext
runtimeContext, RequestIndexer requestIndexer) {
                    requestIndexer.add(createIndexRequest(aggResult));
                }
            }));



*This results in the following error.*

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
        at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
        at ECSPrototype$2.process(ECSPrototype.java:148)
        at ECSPrototype$2.process(ECSPrototype.java:134)
        at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:327)
        at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:303)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)


Anyidea what is wrong here ?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Elasticsearch Sink - Error

Flavio Pompermaier
I also had problems with ES 5.4.3 and I had to modify the connector code...I fear that the code is compatible only up to ES 5.2 or similar..

On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <[hidden email]> wrote:
Hi,
I am using elasticsearch 5.4.3 version in my flink project(flink version
1.3.1)
Details
1. Using Maven build tool.
2. Running from intellij IDE.
3. Elasticsearch is running on the local machine.

Have added the following maven dependency

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
    <version>1.3.1</version>
</dependency>


*code added*

Map<String, String> config = new HashMap<>();
            config.put("cluster.name", "elasticsearch");
            config.put("bulk.flush.max.actions", "1");

            List<InetSocketAddress> transportAddresses = new ArrayList<>();
            transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));

            alerts.addSink(new ElasticsearchSink<AggResult>(config,
transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
                public IndexRequest createIndexRequest(AggResult aggResult){
                    Map<String, Long> json = new HashMap<>();
                    json.put("totalCount", aggResult.getTotalCount());

                    return Requests
                            .indexRequest()
                            .index("logdata")
                            .type("consolidatedStreamData")
                            .source(json);

                }
                @Override
                public void process(AggResult aggResult, RuntimeContext
runtimeContext, RequestIndexer requestIndexer) {
                    requestIndexer.add(createIndexRequest(aggResult));
                }
            }));



*This results in the following error.*

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
        at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
        at ECSPrototype$2.process(ECSPrototype.java:148)
        at ECSPrototype$2.process(ECSPrototype.java:134)
        at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:327)
        at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:303)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)


Anyidea what is wrong here ?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908
Reply | Threaded
Open this post in threaded view
|

Re: Elasticsearch Sink - Error

Fabian Hueske-2
That's correct Flavio.
The issue has been reported as https://issues.apache.org/jira/browse/FLINK-7386

Best, Fabian

2017-08-30 9:21 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I also had problems with ES 5.4.3 and I had to modify the connector code...I fear that the code is compatible only up to ES 5.2 or similar..

On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <[hidden email]> wrote:
Hi,
I am using elasticsearch 5.4.3 version in my flink project(flink version
1.3.1)
Details
1. Using Maven build tool.
2. Running from intellij IDE.
3. Elasticsearch is running on the local machine.

Have added the following maven dependency

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
    <version>1.3.1</version>
</dependency>


*code added*

Map<String, String> config = new HashMap<>();
            config.put("cluster.name", "elasticsearch");
            config.put("bulk.flush.max.actions", "1");

            List<InetSocketAddress> transportAddresses = new ArrayList<>();
            transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));

            alerts.addSink(new ElasticsearchSink<AggResult>(config,
transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
                public IndexRequest createIndexRequest(AggResult aggResult){
                    Map<String, Long> json = new HashMap<>();
                    json.put("totalCount", aggResult.getTotalCount());

                    return Requests
                            .indexRequest()
                            .index("logdata")
                            .type("consolidatedStreamData")
                            .source(json);

                }
                @Override
                public void process(AggResult aggResult, RuntimeContext
runtimeContext, RequestIndexer requestIndexer) {
                    requestIndexer.add(createIndexRequest(aggResult));
                }
            }));



*This results in the following error.*

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
        at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
        at ECSPrototype$2.process(ECSPrototype.java:148)
        at ECSPrototype$2.process(ECSPrototype.java:134)
        at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:327)
        at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:303)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)


Anyidea what is wrong here ?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" value="+3904611823908" target="_blank">+(39) 0461 1823908