Apache Flink and Elasticsearch send Json Object instead of string

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink and Elasticsearch send Json Object instead of string

Fábio Dias
Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key value?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and Elasticsearch send Json Object instead of string

Tzu-Li (Gordon) Tai
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);

What you should do here is parse the field values from `element`, and simply treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.


Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias ([hidden email]) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key value?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and Elasticsearch send Json Object instead of string

Fábio Dias
Hi, 
thanks for the reply.

There isn't other way to do that?
Using REST you can send json like this : 

curl -XPOST 'localhost:9200/customer/external?pretty&pretty' -H 'Content-Type: application/json' -d'
{
 "name": "Jane Doe"
}
'

In my case I have json like this: 

{
      "filters" : {
                        "id" : 1,
                        "name": "abc"
                    }
}

how can I treat this cases? There isn't a way to send all the json element and index it like the in the REST request?

Thanks.

Tzu-Li (Gordon) Tai <[hidden email]> escreveu no dia terça, 21/02/2017 às 07:54:
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);

What you should do here is parse the field values from `element`, and simply treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.


Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias ([hidden email]) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key value?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and Elasticsearch send Json Object instead of string

Tzu-Li (Gordon) Tai
Hi,

The Flink Elasticsearch Sink uses the Elasticsearch Java client to send the indexing requests, so whatever the client supports, it will be achievable through the `ElasticsearchSinkFunction` also.

From a quick check at the Elasticsearch Javadocs, I think you can also just set the document json as a String in the created `IndexRequest`. So,

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }

Here, if `element` is already a Json string representing the document, you can just do 

return Requests
    .indexRequest()
    .index(“logs”)
    .type(“object”)
    .source(“the Json String”);

The `.source(…)` method has quite a few variants on how to set the source, and providing a Map is only one of them.
Please refer to the Elasticsearch Javadocs for the full list (https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/5.2.1).

Hope this helps!

Cheers,
Gordon

On February 21, 2017 at 5:43:36 PM, Fábio Dias ([hidden email]) wrote:

Hi, 
thanks for the reply.

There isn't other way to do that?
Using REST you can send json like this : 

curl -XPOST 'localhost:9200/customer/external?pretty&pretty' -H 'Content-Type: application/json' -d'
{
 "name": "Jane Doe"
}
'

In my case I have json like this: 

{
      "filters" : {
                        "id" : 1,
                        "name": "abc"
                    }
}

how can I treat this cases? There isn't a way to send all the json element and index it like the in the REST request?

Thanks.

Tzu-Li (Gordon) Tai <[hidden email]> escreveu no dia terça, 21/02/2017 às 07:54:
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);

What you should do here is parse the field values from `element`, and simply treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.


Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias ([hidden email]) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key value?

Thanks.