Hi everyone,
I'm trying to connect my flink streaming job to elastic search but I have trouble to make it work. Here is the config I'm using for the connector: HashMap<String,String> elConf = new HashMap<>(); I double checked the address and I can reach it from my machine. Though when I run the code I get the following exception:
Am I missing some configuration here? Or what could be the problem? Flink version is 0.10.1 ElasticSearch version is 1.7.1 cheers Martin |
Hi Martin,
From a quick look into the source code, it seems like the nodes are not necessarily available after the TransportClient has been created. The sampling may take several attempts and the check immediately after the first try is a bit restrictive. Nevertheless, if this happens consistently, then I think your configuration is not correct. Is the port 9200 correct? I think it is 9300 for the Java API, 9200 is for the REST protocol. Cheers, Max On Tue, Dec 15, 2015 at 4:42 PM, Martin Neumann <[hidden email]> wrote: > Hi everyone, > > I'm trying to connect my flink streaming job to elastic search but I have > trouble to make it work. > Here is the config I'm using for the connector: > > HashMap<String,String> elConf = new HashMap<>(); > elConf.put("bulk.flush.max.actions", "1"); > elConf.put("cluster.name", "logelask"); > List<TransportAddress> transports = new ArrayList<>(); > > transports.add(new InetSocketTransportAddress(address, 9200)); > > I double checked the address and I can reach it from my machine. Though when > I run the code I get the following exception: > > java.lang.RuntimeException: Client is not connected to any Elasticsearch > nodes! > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.open(ElasticsearchSink.java:209) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:286) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:213) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > > Am I missing some configuration here? Or what could be the problem? > Flink version is 0.10.1 ElasticSearch version is 1.7.1 > > cheers Martin |
Free forum by Nabble | Edit this page |