Re:Re: ElasticsearchSink on DataSet

Posted by wyphao.2007 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/ElasticsearchSink-on-DataSet-tp12980p13064.html

Hi Flavio

Maybe this is what you want: https://github.com/397090770/flink-elasticsearch2-connectorIt can save Flink DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<[hidden email]>写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio