Flink Elastic Sink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Elastic Sink

anuj.aj07
Hello All,

I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. 

Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. 
So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives

Thanks,
Anuj. 





Reply | Threaded
Open this post in threaded view
|

Re: Flink Elastic Sink

Yangze Guo
Hi, Anuj.

From my understanding, you could send IndexRequest to the indexer in
`ElasticsearchSink`. It will create a document under the given index
and type. So, it seems you only need to get the timestamp and concat
the `date` to your index. Am I understanding that correctly? Or do you
want to emit only 1 record per day?

Best,
Yangze Guo

On Fri, May 29, 2020 at 2:43 AM aj <[hidden email]> wrote:

>
> Hello All,
>
> I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format.
>
> Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix.
> So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives
>
> Thanks,
> Anuj.
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Elastic Sink

Leonard Xu
In reply to this post by anuj.aj07
Hi,aj

In the implementation of ElasticsearchSink, ElasticsearchSink  won't create index and only start a Elastic client for sending requests to
the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2],  ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic cluster will create corresponding index and flush the records.

BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql connector [2], you can simply config 'connector.index' = ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.

Best,
Leoanrd Xu



在 2020年5月29日,02:43,aj <[hidden email]> 写道:

Hello All,

I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. 

Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. 
So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives

Thanks,
Anuj. 






Reply | Threaded
Open this post in threaded view
|

Re: Flink Elastic Sink

anuj.aj07
Thanks, It worked. 

I was confused before as I was thinking the sink builder is called only once but it gets called for every batch request, correct me if my understanding is wrong. 

On Fri, May 29, 2020 at 9:08 AM Leonard Xu <[hidden email]> wrote:
Hi,aj

In the implementation of ElasticsearchSink, ElasticsearchSink  won't create index and only start a Elastic client for sending requests to
the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2],  ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic cluster will create corresponding index and flush the records.

BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql connector [2], you can simply config 'connector.index' = ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.

Best,
Leoanrd Xu



在 2020年5月29日,02:43,aj <[hidden email]> 写道:

Hello All,

I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. 

Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. 
So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives

Thanks,
Anuj. 








--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink Elastic Sink

Leonard Xu
Hi, aj

I was confused before as I was thinking the sink builder is called only once but it gets called for every batch request, correct me if my understanding is wrong. 

You’re right that sink builder should be called only once rather than every batch requests, could you post some code piece of using the sink?

Best,
Leonard Xu



On Fri, May 29, 2020 at 9:08 AM Leonard Xu <[hidden email]> wrote:
Hi,aj

In the implementation of ElasticsearchSink, ElasticsearchSink  won't create index and only start a Elastic client for sending requests to
the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2],  ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic cluster will create corresponding index and flush the records.

BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql connector [2], you can simply config 'connector.index' = ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.

Best,
Leoanrd Xu



在 2020年5月29日,02:43,aj <[hidden email]> 写道:

Hello All,

I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. 

Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. 
So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives

Thanks,
Anuj. 








--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07