Re: ElasticSearch Connector

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Re: ElasticSearch Connector

Dawid Wysakowicz-2

Hi Ramya,

I think the problem is that you access the serializationSchema from the closure of ElasticsearchSinkFunction. Try creating ElasticsearchSinkFunction that will get the serializationSchema in ctor. If this is not the problem could you share the full stack of the error?



On 16/01/2019 11:24, Ramya Ramamurthy wrote:
Hi Dawid,

Thanks for your response. I was able to make it working.
But I am able to make it work on my IDE. When i have deployed the same to my cluster, i am getting the below error

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function

I am not sure of what is the environment change. Anything you would suggest to resolve this issue ??
Attaching my sample code for the same.


On Thu, Jan 10, 2019 at 8:46 PM Dawid Wysakowicz <[hidden email]> wrote:


I'm afraid you cannot write to different indices using the table API ElasticSearch connector. Now I know why you wanted to go through datastream API.

What you could do to transform from Row to JSON is to use org.apache.flink.formats.json.JsonRowSerializationSchema from flink-json. You just need to get the schema from the final Table of your table API part. Your code could like this:

TypeInformation<Row> schema = table.getSchema().toRowType();

SerializationSchema<Row> serializationSchema = new JsonRowSerializationSchema(schema);

 public IndexRequest createIndexRequest(Row element) {

    byte[] document = serializationSchema.serialize(row)


    return new IndexRequest(index, docType)
                .source(document, contentType)




On 10/01/2019 10:34, Ramya Ramamurthy wrote:

Sorry I am a beginner here. I am not really sure how to pack the dynamic
indices here.
the .index(test-ddmmyy) kind of indices here.
I have set the watermark for my kafka table source, but not sure how this
works on the the ElasticSearch Sink.

Pasted my sample code below:

tableEnv.connect(new Kafka()
      .property("bootstrap.servers", "localhost:9092")
      .withSchema(new Schema()
            .field("sid", Types.STRING())
            .field("ip", Types.STRING())
            .field("family", Types.STRING())
            .field("total_hits", Types.LONG())
            .field("tumbleStart", Types.SQL_TIMESTAMP())
            .field("tumbleEnd", Types.SQL_TIMESTAMP())
      .withFormat(new Json().deriveSchema())

new Elasticsearch()
.host("localhost", 9200, "http")
.index("test")  ---- How to pass dynamic indices here, based on the
packet received from the table sink.

.bulkFlushMaxSize("20 mb")


Thanks again !!

On Thu, Jan 10, 2019 at 2:55 PM miki haiat [hidden email] wrote:

You can use flink  to manipulate the data by using
TimeCharacteristic.EventTime[1] and set Watermark.
Then if you have a lag or other issue  the data will be insert to the
correct Indexes in elastic.
More specific way to implement it with kafka[2]


On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy [hidden email]

Hi David,

thanks for the quick reply.
I did try that. I am not sure how to push into rolling indices here.
For example,  i would maintain daily indices on ES. Based on the event
time, i would like to classify the packets to appropriate indices. If
was some lag in the source kafka, and i get to receive yesterday's data
[say maybe at 00:05 or something], Not sure how to pack the indices here.
Is there a way to come around this ??


On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]


Hi Ramya,

Have you tried writing to ES directly from table API? You can check the
ES connector for table API here:


On 10/01/2019 09:21, Ramya Ramamurthy wrote:

I am learning to Flink. With Flink 1.7.1, trying to read from Kafka
insert to ElasticSearch. I have a kafka connector convert the data
to a
Flink table. In order to insert into Elasticsearch, I have converted
table to a datastream, in order to be able to use the
But the Row returned by the streams, have lost the schema. How do i
this to JSON before calling the Elasticsearch sink connector. Any
suggestions would be appreciated.


signature.asc (849 bytes) Download Attachment