ElasticsearchSink on DataSet

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

ElasticsearchSink on DataSet

Flavio Pompermaier

Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: ElasticsearchSink on DataSet

Tzu-Li (Gordon) Tai
Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re:Re: ElasticsearchSink on DataSet

wyphao.2007
Hi Flavio

Maybe this is what you want: https://github.com/397090770/flink-elasticsearch2-connectorIt can save Flink DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<[hidden email]>写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re:Re: ElasticsearchSink on DataSet

Tzu-Li (Gordon) Tai
Hi!

Thanks for sharing that repo! I think that would be quite an useful contribution to Flink for the users, if you’re up to preparing a PR for it :)

It also looks like you’ve adopted most of the current ElasticsearchSink APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the ElasticsearchOutputFormat, which is nice to fit into the current code :-D

Cheers,
Gordon


On 9 May 2017 at 1:05:14 PM, wyphao.2007 ([hidden email]) wrote:

Hi Flavio

Maybe this is what you want: https://github.com/397090770/flink-elasticsearch2-connectorIt can save Flink DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<[hidden email]>写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re:Re: ElasticsearchSink on DataSet

Flavio Pompermaier
Thanks a lot for the support!

On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi!

Thanks for sharing that repo! I think that would be quite an useful contribution to Flink for the users, if you’re up to preparing a PR for it :)

It also looks like you’ve adopted most of the current ElasticsearchSink APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the ElasticsearchOutputFormat, which is nice to fit into the current code :-D

Cheers,
Gordon


On 9 May 2017 at 1:05:14 PM, wyphao.2007 ([hidden email]) wrote:

Hi Flavio

Maybe this is what you want: https://github.com/397090770/flink-elasticsearch2-connectorIt can save Flink DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<[hidden email]>写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Re: ElasticsearchSink on DataSet

Flavio Pompermaier
Just one note: I took a look at your connector and it doesn't provide any failure handling mechanism that is very useful for us.
Maybe it could worth to add ActionRequestFailureHandler as provided now by the current ES streaming connector and introduced by commit https://github.com/apache/flink/commit/3743e898104d79a9813d444d38fa9f86617bb5ef

Best,
Flavio

On Tue, May 9, 2017 at 8:17 AM, Flavio Pompermaier <[hidden email]> wrote:
Thanks a lot for the support!

On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi!

Thanks for sharing that repo! I think that would be quite an useful contribution to Flink for the users, if you’re up to preparing a PR for it :)

It also looks like you’ve adopted most of the current ElasticsearchSink APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the ElasticsearchOutputFormat, which is nice to fit into the current code :-D

Cheers,
Gordon


On 9 May 2017 at 1:05:14 PM, wyphao.2007 ([hidden email]) wrote:

Hi Flavio

Maybe this is what you want: https://github.com/397090770/flink-elasticsearch2-connector, It can save Flink DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<[hidden email]>写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio



--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: ElasticsearchSink on DataSet

wyphao.2007
Hi Flavio, I made a PR for this : https://github.com/apache/flink/pull/3869 
And it also support ActionRequestFailureHandler in DataSet's ElasticsearchSink

Best

在2017年05月09 15时30分, "Flavio Pompermaier"<[hidden email]>写道:

Just one note: I took a look at your connector and it doesn't provide any failure handling mechanism that is very useful for us.
Maybe it could worth to add ActionRequestFailureHandler as provided now by the current ES streaming connector and introduced by commit https://github.com/apache/flink/commit/3743e898104d79a9813d444d38fa9f86617bb5ef

Best,
Flavio

On Tue, May 9, 2017 at 8:17 AM, Flavio Pompermaier <[hidden email]> wrote:
Thanks a lot for the support!

On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi!

Thanks for sharing that repo! I think that would be quite an useful contribution to Flink for the users, if you’re up to preparing a PR for it :)

It also looks like you’ve adopted most of the current ElasticsearchSink APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the ElasticsearchOutputFormat, which is nice to fit into the current code :-D

Cheers,
Gordon


On 9 May 2017 at 1:05:14 PM, wyphao.2007 ([hidden email]) wrote:

Hi Flavio

Maybe this is what you want: https://github.com/397090770/flink-elasticsearch2-connector, It can save Flink DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<[hidden email]>写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio



--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: ElasticsearchSink on DataSet

Flavio Pompermaier
Great! I was just thinking that, in principle, a streaming sink is an extension of a batch one. Am I wrong?
This would avoid a lot of code duplication and would improve the overall maintainability..

On Thu, May 11, 2017 at 4:35 AM, wyphao.2007 <[hidden email]> wrote:
Hi Flavio, I made a PR for this : https://github.com/apache/flink/pull/3869 
And it also support ActionRequestFailureHandler in DataSet's ElasticsearchSink

Best

在2017年05月09 15时30分, "Flavio Pompermaier"<[hidden email]>写道:

Just one note: I took a look at your connector and it doesn't provide any failure handling mechanism that is very useful for us.
Maybe it could worth to add ActionRequestFailureHandler as provided now by the current ES streaming connector and introduced by commit https://github.com/apache/flink/commit/3743e898104d79a9813d444d38fa9f86617bb5ef

Best,
Flavio

On Tue, May 9, 2017 at 8:17 AM, Flavio Pompermaier <[hidden email]> wrote:
Thanks a lot for the support!

On 9 May 2017 07:53, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi!

Thanks for sharing that repo! I think that would be quite an useful contribution to Flink for the users, if you’re up to preparing a PR for it :)

It also looks like you’ve adopted most of the current ElasticsearchSink APIs (RequestIndexer, ElasticsearchSinkFunction, etc.) for the ElasticsearchOutputFormat, which is nice to fit into the current code :-D

Cheers,
Gordon


On 9 May 2017 at 1:05:14 PM, wyphao.2007 ([hidden email]) wrote:

Hi Flavio

Maybe this is what you want: https://github.com/397090770/flink-elasticsearch2-connector, It can save Flink DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> "elasticsearch")val hosts = "www.iteblog.com"val transports = hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 9300)).toListval data : DataSet[String] = ....
data.output(new ElasticSearchOutputFormat(config, transports, new ElasticsearchSinkFunction[String] {      def createIndexRequest(element: String): IndexRequest = {        Requests.indexRequest.index("iteblog").`type`("info").source(element)
      }      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"<[hidden email]>写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, which generally speaking at the moment has no bridge or unification yet with the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier ([hidden email]) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet<String> that I write to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio



--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" value="+3904611823908" target="_blank">+(39) 0461 1823908



--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908