after i install x-pack in my elasticsearch cluster and the elasticsearch cluster with basicauth
the elasticsearch sink can't connect to elastic cluster code like: DataStream<Tuple2<Boolean, Row>> esSink27 = tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0); //generate user config map java.util.Map<String, String> userConfigMap22 = com.google.common.collect.Maps.newHashMap(); userConfigMap22.put("cluster.name", "test-magina"); userConfigMap22.put("bulk.flush.max.actions", "1"); //userConfigMap22.put("shield.user", "elastic:magina1001password"); //generate transports list Splitter commaSplitter24 = Splitter.on(","); Splitter colonSplitter25 = Splitter.on(":"); List<InetSocketAddress> transportsList23 = Lists.newArrayList(); for (String transport : commaSplitter24.split("101.206.91.118:9300")) { List<String> ipAndPort = colonSplitter25.splitToList(transport); transportsList23.add(new InetSocketAddress(InetAddress.getByName(ipAndPort.get(0)), Integer.parseInt(ipAndPort.get(1)))); } esSink27.addSink(new ElasticsearchSink<Tuple2<Boolean, Row>>(userConfigMap22, transportsList23, new MaginaES5SinkFunction(esTable26.getSchema().getColumnNames(), "userid", "test-au", "test-au", "action,num"), new RetryRejectedExecutionFailureHandler())).name("elasticsearch_4068").setParallelism(2);
|
Hi, I think you need to a custom `RestClientFactory` which enables basic auth on the ElasticSearch RestClient according to this documentation [1]. You can set the RestClientFactory on the ElasticsearchSink.Builder. [1] https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html Cheers, Till On Thu, Nov 22, 2018 at 9:36 AM hzyuemeng1 <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |