Hello All,
I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. So in Flink stream job if I create an es sink how will I change the sink to start writing in a new index when the first event of the day arrives Thanks, Anuj. |
Hi, Anuj.
From my understanding, you could send IndexRequest to the indexer in `ElasticsearchSink`. It will create a document under the given index and type. So, it seems you only need to get the timestamp and concat the `date` to your index. Am I understanding that correctly? Or do you want to emit only 1 record per day? Best, Yangze Guo On Fri, May 29, 2020 at 2:43 AM aj <[hidden email]> wrote: > > Hello All, > > I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. > > Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. > So in Flink stream job if I create an es sink how will I change the sink to start writing in a new index when the first event of the day arrives > > Thanks, > Anuj. > > > > > |
In reply to this post by anuj.aj07
Hi,aj
In the implementation of ElasticsearchSink, ElasticsearchSink won't create index and only start a Elastic client for sending requests to the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2], ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic cluster will create corresponding index and flush the records. BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql connector [2], you can simply config 'connector.index' = ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals. Best, Leoanrd Xu
|
Thanks, It worked. I was confused before as I was thinking the sink builder is called only once but it gets called for every batch request, correct me if my understanding is wrong. On Fri, May 29, 2020 at 9:08 AM Leonard Xu <[hidden email]> wrote:
|
Hi, aj
You’re right that sink builder should be called only once rather than every batch requests, could you post some code piece of using the sink? Best, Leonard Xu
|
Free forum by Nabble | Edit this page |