elasticsearch sink can't connect to elastic cluster with BasicAuth

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

elasticsearch sink can't connect to elastic cluster with BasicAuth

hzyuemeng1

after i install x-pack in my elasticsearch cluster and the elasticsearch cluster with basicauth
the elasticsearch sink can't connect to elastic cluster

code like:

DataStream<Tuple2<Boolean, Row>> esSink27 = tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);
//generate user config map
java.util.Map<String, String> userConfigMap22 = com.google.common.collect.Maps.newHashMap();
userConfigMap22.put("cluster.name", "test-magina");
userConfigMap22.put("bulk.flush.max.actions", "1");
//userConfigMap22.put("shield.user", "elastic:magina1001password");

//generate transports list
Splitter commaSplitter24 = Splitter.on(",");
Splitter colonSplitter25 = Splitter.on(":");
List<InetSocketAddress> transportsList23 = Lists.newArrayList();
for (String transport : commaSplitter24.split("101.206.91.118:9300")) {
    List<String> ipAndPort = colonSplitter25.splitToList(transport);
    transportsList23.add(new InetSocketAddress(InetAddress.getByName(ipAndPort.get(0)), Integer.parseInt(ipAndPort.get(1))));
}
esSink27.addSink(new ElasticsearchSink<Tuple2<Boolean, Row>>(userConfigMap22, transportsList23, new MaginaES5SinkFunction(esTable26.getSchema().getColumnNames(), "userid", "test-au", "test-au", "action,num"), new RetryRejectedExecutionFailureHandler())).name("elasticsearch_4068").setParallelism(2);

  • Any help will be greatly appreciated


Reply | Threaded
Open this post in threaded view
|

Re: elasticsearch sink can't connect to elastic cluster with BasicAuth

Till Rohrmann
Hi,

I think you need to a custom `RestClientFactory` which enables basic auth on the ElasticSearch RestClient according to this documentation [1]. You can set the RestClientFactory on the ElasticsearchSink.Builder.


Cheers,
Till

On Thu, Nov 22, 2018 at 9:36 AM hzyuemeng1 <[hidden email]> wrote:

after i install x-pack in my elasticsearch cluster and the elasticsearch cluster with basicauth
the elasticsearch sink can't connect to elastic cluster

code like:

DataStream<Tuple2<Boolean, Row>> esSink27 = tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);
//generate user config map
java.util.Map<String, String> userConfigMap22 = com.google.common.collect.Maps.newHashMap();
userConfigMap22.put("cluster.name", "test-magina");
userConfigMap22.put("bulk.flush.max.actions", "1");
//userConfigMap22.put("shield.user", "elastic:magina1001password");

//generate transports list
Splitter commaSplitter24 = Splitter.on(",");
Splitter colonSplitter25 = Splitter.on(":");
List<InetSocketAddress> transportsList23 = Lists.newArrayList();
for (String transport : commaSplitter24.split("101.206.91.118:9300")) {
    List<String> ipAndPort = colonSplitter25.splitToList(transport);
    transportsList23.add(new InetSocketAddress(InetAddress.getByName(ipAndPort.get(0)), Integer.parseInt(ipAndPort.get(1))));
}
esSink27.addSink(new ElasticsearchSink<Tuple2<Boolean, Row>>(userConfigMap22, transportsList23, new MaginaES5SinkFunction(esTable26.getSchema().getColumnNames(), "userid", "test-au", "test-au", "action,num"), new RetryRejectedExecutionFailureHandler())).name("elasticsearch_4068").setParallelism(2);

  • Any help will be greatly appreciated