|
Hi Ramya,
I think the problem is that you access the serializationSchema
from the closure of ElasticsearchSinkFunction. Try creating
ElasticsearchSinkFunction that will get the serializationSchema in
ctor. If this is not the problem could you share the full stack of
the error?
Best,
Dawid
On 16/01/2019 11:24, Ramya Ramamurthy
wrote:
Hi Dawid,
Thanks for your response. I was able to make it working.
But I am able to make it work on my IDE. When i have
deployed the same to my cluster, i am getting the
below error
org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function
I am not sure of what is the environment change. Anything
you would suggest to resolve this issue ??
Attaching my sample code for the same.
Regards,
~Ramya.
On Thu, Jan 10, 2019 at 8:46 PM Dawid Wysakowicz
< [hidden email]> wrote:
Hi,
I'm afraid you cannot write to different indices using
the table API ElasticSearch connector. Now I know why you
wanted to go through datastream API.
What you could do to transform from Row to JSON is to use
org.apache.flink.formats.json.JsonRowSerializationSchema
from flink-json. You just need to get the schema from the
final Table of your table API part. Your code could like
this:
TypeInformation<Row> schema =
table.getSchema().toRowType();
SerializationSchema<Row> serializationSchema =
new JsonRowSerializationSchema(schema);
public IndexRequest createIndexRequest(Row element) {
byte[] document =
serializationSchema.serialize(row)
...
return new IndexRequest(index, docType)
.source(document, contentType)
}
Best,
Dawid
On
10/01/2019 10:34, Ramya Ramamurthy wrote:
Hi,
Sorry I am a beginner here. I am not really sure how to pack the dynamic
indices here.
the .index(test-ddmmyy) kind of indices here.
I have set the watermark for my kafka table source, but not sure how this
works on the the ElasticSearch Sink.
Pasted my sample code below:
tableEnv.connect(new Kafka()
.version("0.11")
.topic(params.getRequired("write-topic"))
.property("bootstrap.servers", "localhost:9092")
.sinkPartitionerRoundRobin())
.withSchema(new Schema()
.field("sid", Types.STRING())
.field("ip", Types.STRING())
.field("family", Types.STRING())
.field("total_hits", Types.LONG())
.field("tumbleStart", Types.SQL_TIMESTAMP())
.field("tumbleEnd", Types.SQL_TIMESTAMP())
)
.withFormat(new Json().deriveSchema())
.inAppendMode()
.registerTableSink("sinkTopic");
new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("test") ---- How to pass dynamic indices here, based on the
packet received from the table sink.
.documentType("user")
.failureHandlerRetryRejected()
.failureHandlerIgnore()
.bulkFlushMaxSize("20 mb")
.bulkFlushInterval(100000L)
.bulkFlushBackoffMaxRetries(3)
.connectionMaxRetryTimeout(3)
.connectionPathPrefix("/v1")
Thanks again !!
On Thu, Jan 10, 2019 at 2:55 PM miki haiat [hidden email] wrote:
You can use flink to manipulate the data by using
TimeCharacteristic.EventTime[1] and set Watermark.
Then if you have a lag or other issue the data will be insert to the
correct Indexes in elastic.
More specific way to implement it with kafka[2]
1.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy [hidden email]
wrote:
Hi David,
thanks for the quick reply.
I did try that. I am not sure how to push into rolling indices here.
For example, i would maintain daily indices on ES. Based on the event
time, i would like to classify the packets to appropriate indices. If
there
was some lag in the source kafka, and i get to receive yesterday's data
[say maybe at 00:05 or something], Not sure how to pack the indices here.
Is there a way to come around this ??
Regards,
~Ramya.
On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]
wrote:
Hi Ramya,
Have you tried writing to ES directly from table API? You can check the
ES connector for table API here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
Best,
Dawid
On 10/01/2019 09:21, Ramya Ramamurthy wrote:
Hi,
I am learning to Flink. With Flink 1.7.1, trying to read from Kafka
and
insert to ElasticSearch. I have a kafka connector convert the data
to a
Flink table. In order to insert into Elasticsearch, I have converted
this
table to a datastream, in order to be able to use the
ElasticSearchSink.
But the Row returned by the streams, have lost the schema. How do i
convert
this to JSON before calling the Elasticsearch sink connector. Any
help
or
suggestions would be appreciated.
Thanks.
|