Good day everyone,
I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following error: Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor; at org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76) Below is my ElasticsearchSinkFunction: import org.elasticsearch.action.update.UpdateRequest def upsertRequest(element: T): UpdateRequest = { new UpdateRequest( "myIndex", "record", s"${element.id}") .doc(element.toMap()) } override def process(element: T, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { requestIndexer.add(upsertRequest(element)) } What could be the issue here? Thanks for your help. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
thanks for your feedback. I agree that the the current interfaces are not flexible enough to fit to every use case. The unified connector API is a a very recent feature that still needs some polishing. I'm working on a design document to improve the situation there. For now, you can simply implement some utitilty method that just iterates over column names and types of TableSchema and calls `schema.field(name, type)` I hope this helps. Regards, Timo Am 31.08.18 um 08:10 schrieb Averell: > Good day everyone, > > I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following > error: > > Caused by: java.lang.NoSuchMethodError: > org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor; > at > org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76) > > Below is my ElasticsearchSinkFunction: > > import org.elasticsearch.action.update.UpdateRequest > def upsertRequest(element: T): UpdateRequest = { > new UpdateRequest( > "myIndex", > "record", > s"${element.id}") > .doc(element.toMap()) > } > override def process(element: T, runtimeContext: RuntimeContext, > requestIndexer: RequestIndexer): Unit = { > requestIndexer.add(upsertRequest(element)) > } > > What could be the issue here? > > Thanks for your help. > > Regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Averell
Hi Averell,
sorry for my wrong other mail. I also observed this issue when implementing FLINK-3875. Currently, update requests are broken due to a binary incompatibility. I already have a fix for this in a different branch. I opened FLINK-10269 [1] to track the issue. As a work around you can simply copy org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer to your project. This should ensure that the class is compiled correctly. If it doesn't help, please let us know. Regards, Timo [1] https://issues.apache.org/jira/browse/FLINK-10269 Am 31.08.18 um 08:10 schrieb Averell: > Good day everyone, > > I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following > error: > > Caused by: java.lang.NoSuchMethodError: > org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor; > at > org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76) > > Below is my ElasticsearchSinkFunction: > > import org.elasticsearch.action.update.UpdateRequest > def upsertRequest(element: T): UpdateRequest = { > new UpdateRequest( > "myIndex", > "record", > s"${element.id}") > .doc(element.toMap()) > } > override def process(element: T, runtimeContext: RuntimeContext, > requestIndexer: RequestIndexer): Unit = { > requestIndexer.add(upsertRequest(element)) > } > > What could be the issue here? > > Thanks for your help. > > Regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Timo,
Thanks for your help. I don't get that error anymore after putting that file into my project. However, I don't understand how it could help. I have been using the Flink binary built on my same laptop, then how could it be different between having that java class in Flink project vs in my project? If you have some spare time, please help explain. I also would like to know the other way to fix that issue (that you implemented in your branch). Thanks a lot for your help. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
The problem is that BulkProcessorIndexer is located in
flink-connector-elasticsearch-base which is compiled against a very old ES version. This old version is source code compatible but apparently not binary compatible with newer Elasticsearch classes. By copying this class you force to compile the class against ES 6 and don't use the old class in the base module. The fix will include to improve the API call bridge. As done here [1]. Regards, Timo [1] https://github.com/apache/flink/pull/6611 Am 31.08.18 um 09:06 schrieb Averell: > Hi Timo, > > Thanks for your help. I don't get that error anymore after putting that file > into my project. > However, I don't understand how it could help. I have been using the Flink > binary built on my same laptop, then how could it be different between > having that java class in Flink project vs in my project? > If you have some spare time, please help explain. > > I also would like to know the other way to fix that issue (that you > implemented in your branch). > > Thanks a lot for your help. > Regards, > Averell > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |