ElasticSearch 6 - error with UpdateRequest

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

ElasticSearch 6 - error with UpdateRequest

Averell
Good day everyone,

I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
error:

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
        at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)

Below is my ElasticsearchSinkFunction:

        import org.elasticsearch.action.update.UpdateRequest
        def upsertRequest(element: T): UpdateRequest = {
                new UpdateRequest(
                        "myIndex",
                        "record",
                        s"${element.id}")
                .doc(element.toMap())
        }
        override def process(element: T, runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = {
                requestIndexer.add(upsertRequest(element))
        }

What could be the issue here?

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch 6 - error with UpdateRequest

Timo Walther
Hi,

thanks for your feedback. I agree that the the current interfaces are
not flexible enough to fit to every use case. The unified connector API
is a a very recent feature that still needs some polishing. I'm working
on a design document to improve the situation there.

For now, you can simply implement some utitilty method that just
iterates over column names and types of TableSchema and calls
`schema.field(name, type)`

I hope this helps.

Regards,
Timo


Am 31.08.18 um 08:10 schrieb Averell:

> Good day everyone,
>
> I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
> error:
>
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
> at
> org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)
>
> Below is my ElasticsearchSinkFunction:
>
> import org.elasticsearch.action.update.UpdateRequest
> def upsertRequest(element: T): UpdateRequest = {
> new UpdateRequest(
> "myIndex",
> "record",
> s"${element.id}")
>         .doc(element.toMap())
> }
> override def process(element: T, runtimeContext: RuntimeContext,
> requestIndexer: RequestIndexer): Unit = {
> requestIndexer.add(upsertRequest(element))
> }
>
> What could be the issue here?
>
> Thanks for your help.
>
> Regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch 6 - error with UpdateRequest

Timo Walther
In reply to this post by Averell
Hi Averell,

sorry for my wrong other mail.

I also observed this issue when implementing FLINK-3875. Currently,
update requests are broken due to a binary incompatibility. I already
have a fix for this in a different branch. I opened FLINK-10269 [1] to
track the issue.

As a work around you can simply copy
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer
to your project. This should ensure that the class is compiled
correctly. If it doesn't help, please let us know.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-10269

Am 31.08.18 um 08:10 schrieb Averell:

> Good day everyone,
>
> I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
> error:
>
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
> at
> org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)
>
> Below is my ElasticsearchSinkFunction:
>
> import org.elasticsearch.action.update.UpdateRequest
> def upsertRequest(element: T): UpdateRequest = {
> new UpdateRequest(
> "myIndex",
> "record",
> s"${element.id}")
>         .doc(element.toMap())
> }
> override def process(element: T, runtimeContext: RuntimeContext,
> requestIndexer: RequestIndexer): Unit = {
> requestIndexer.add(upsertRequest(element))
> }
>
> What could be the issue here?
>
> Thanks for your help.
>
> Regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch 6 - error with UpdateRequest

Averell
Hi Timo,

Thanks for your help. I don't get that error anymore after putting that file
into my project.
However, I don't understand how it could help. I have been using the Flink
binary built on my same laptop, then how could it be different between
having that java class in Flink project vs in my project?
If you have some spare time, please help explain.

I also would like to know the other way to fix that issue (that you
implemented in your branch).

Thanks a lot for your help.
Regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch 6 - error with UpdateRequest

Timo Walther
The problem is that BulkProcessorIndexer is located in
flink-connector-elasticsearch-base which is compiled against a very old
ES version. This old version is source code compatible but apparently
not binary compatible with newer Elasticsearch classes. By copying this
class you force to compile the class against ES 6 and don't use the old
class in the base module.

The fix will include to improve the API call bridge. As done here [1].

Regards,
Timo

[1] https://github.com/apache/flink/pull/6611


Am 31.08.18 um 09:06 schrieb Averell:

> Hi Timo,
>
> Thanks for your help. I don't get that error anymore after putting that file
> into my project.
> However, I don't understand how it could help. I have been using the Flink
> binary built on my same laptop, then how could it be different between
> having that java class in Flink project vs in my project?
> If you have some spare time, please help explain.
>
> I also would like to know the other way to fix that issue (that you
> implemented in your branch).
>
> Thanks a lot for your help.
> Regards,
> Averell
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/