Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

Ted Yu
Did you use image for the code ?
Can you send plain code again ?
Cheers

-------- Original message --------
From: mingleizhang <[hidden email]>
Date: 8/16/17 6:16 PM (GMT-08:00)
To: mingleizhang <[hidden email]>
Cc: "Tzu-Li (Gordon) Tai" <[hidden email]>, [hidden email]
Subject: Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

I solved the issue by adding a dependency that convert the protobuf objects into a JSON. By adding a line of code like below:  element is a PB object.


Thanks.
zhangminglei



At 2017-08-16 22:52:30, "mingleizhang" <[hidden email]> wrote:
I looked into the sinked data which in ElasticSearch. Good news I can found it is really right there. But but, I sinked the data is an object. But the Elasticsearch represent it as a string. I put the related code below.

element type is an ActivityInfo. then, I wrote a java api to read the data. the value is a string instead. I want it represented as an object of ActivityInfo. But it didnt do like what i want.

Can anybody give me some advice for it ? Thank you very much!





Thanks
zhangminglei / mingleizhang



At 2017-08-16 20:52:34, "mingleizhang" <[hidden email]> wrote:

Hi, Gordon.

      I am not sure about this, as far as I know. ElasticSearch often store JSON data inside it as it is convenient to create it's index. As refers to my code below, I stored the protobuf objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is a binary data stored in it. It is very strange I feel. Flink document just give an example for it's data which type belongs to a string as JSON.

Peace,
Zhangminglei




At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi,

I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)

Cheers,
Gordon


On 15 August 2017 at 9:18:59 PM, mingleizhang ([hidden email]) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.




At 2017-08-15 21:17:00, "mingleizhang" <[hidden email]> wrote:
Hi, flink experts!

I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.

// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))

Thanks
mingleizhang



 






 





Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

zhangminglei
Ahhh. Sorry Ted. I didnt see the code was broken. Yep, I will directly put the text code here. 

Dependency is 
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.2</version>
</dependency>
And the adding code is like following. This time, although I sink an object to Elasticsearch, I convert it to a JSON by JsonFormat.printToString(element). And it can solve my issue as I can get my data from Elasticsearch represent as a json string, then I can use it to show my data as a front end. 
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, AnyRef]
json.put("activityInfo", element)
json.put("mid", element.getMid)
json.put("activity", element.getActivity)
json.put("json", JsonFormat.printToString(element))
Requests.indexRequest().index("filter_event_tracking").`type`("my-type-2").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Peace.
zhangminglei / mingleizhang


At 2017-08-17 09:21:47, "Ted Yu" <[hidden email]> wrote:
Did you use image for the code ?
Can you send plain code again ?
Cheers

-------- Original message --------
From: mingleizhang <[hidden email]>
Date: 8/16/17 6:16 PM (GMT-08:00)
To: mingleizhang <[hidden email]>
Cc: "Tzu-Li (Gordon) Tai" <[hidden email]>, [hidden email]
Subject: Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

I solved the issue by adding a dependency that convert the protobuf objects into a JSON. By adding a line of code like below:  element is a PB object.


Thanks.
zhangminglei



At 2017-08-16 22:52:30, "mingleizhang" <[hidden email]> wrote:
I looked into the sinked data which in ElasticSearch. Good news I can found it is really right there. But but, I sinked the data is an object. But the Elasticsearch represent it as a string. I put the related code below.

element type is an ActivityInfo. then, I wrote a java api to read the data. the value is a string instead. I want it represented as an object of ActivityInfo. But it didnt do like what i want.

Can anybody give me some advice for it ? Thank you very much!





Thanks
zhangminglei / mingleizhang



At 2017-08-16 20:52:34, "mingleizhang" <[hidden email]> wrote:

Hi, Gordon.

      I am not sure about this, as far as I know. ElasticSearch often store JSON data inside it as it is convenient to create it's index. As refers to my code below, I stored the protobuf objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is a binary data stored in it. It is very strange I feel. Flink document just give an example for it's data which type belongs to a string as JSON.

Peace,
Zhangminglei




At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi,

I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)

Cheers,
Gordon


On 15 August 2017 at 9:18:59 PM, mingleizhang ([hidden email]) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.




At 2017-08-15 21:17:00, "mingleizhang" <[hidden email]> wrote:
Hi, flink experts!

I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.

// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))

Thanks
mingleizhang



 






 








 

Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

Tzu-Li (Gordon) Tai
Hi,

I see what you were asking about now.

Yes, it doesn’t make sense to sink an object to Elasticsearch. You either need to transform the object to a JSON using libraries like Protobuf / Jackson / etc., or disintegrate it yourself into a Map.

One thing I noticed is:
json.put("json", JsonFormat.printToString(element))
If what you want is all fields in this object to be able to be queried in Elasticsearch, what you could do is simply:

`Requests.indexRequest().index(...).type(…).source(<the JSON string>)`

This would work fine, and you can also save the extra redundant layer in your sinked JSON.

On 17 August 2017 at 11:23:15 AM, mingleizhang ([hidden email]) wrote:

Ahhh. Sorry Ted. I didnt see the code was broken. Yep, I will directly put the text code here. 

Dependency is 
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.2</version>
</dependency>
And the adding code is like following. This time, although I sink an object to Elasticsearch, I convert it to a JSON by JsonFormat.printToString(element). And it can solve my issue as I can get my data from Elasticsearch represent as a json string, then I can use it to show my data as a front end. 
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, AnyRef]
json.put("activityInfo", element)
json.put("mid", element.getMid)
json.put("activity", element.getActivity)
json.put("json", JsonFormat.printToString(element))
Requests.indexRequest().index("filter_event_tracking").`type`("my-type-2").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Peace.
zhangminglei / mingleizhang


At 2017-08-17 09:21:47, "Ted Yu" <[hidden email]> wrote:
Did you use image for the code ?
Can you send plain code again ?
Cheers

-------- Original message --------
From: mingleizhang <[hidden email]>
Date: 8/16/17 6:16 PM (GMT-08:00)
To: mingleizhang <[hidden email]>
Cc: "Tzu-Li (Gordon) Tai" <[hidden email]>, [hidden email]
Subject: Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

I solved the issue by adding a dependency that convert the protobuf objects into a JSON. By adding a line of code like below:  element is a PB object.


Thanks.
zhangminglei



At 2017-08-16 22:52:30, "mingleizhang" <[hidden email]> wrote:
I looked into the sinked data which in ElasticSearch. Good news I can found it is really right there. But but, I sinked the data is an object. But the Elasticsearch represent it as a string. I put the related code below.

element type is an ActivityInfo. then, I wrote a java api to read the data. the value is a string instead. I want it represented as an object of ActivityInfo. But it didnt do like what i want.

Can anybody give me some advice for it ? Thank you very much!





Thanks
zhangminglei / mingleizhang



At 2017-08-16 20:52:34, "mingleizhang" <[hidden email]> wrote:

Hi, Gordon.

      I am not sure about this, as far as I know. ElasticSearch often store JSON data inside it as it is convenient to create it's index. As refers to my code below, I stored the protobuf objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is a binary data stored in it. It is very strange I feel. Flink document just give an example for it's data which type belongs to a string as JSON.

Peace,
Zhangminglei




At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi,

I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)

Cheers,
Gordon


On 15 August 2017 at 9:18:59 PM, mingleizhang ([hidden email]) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.




At 2017-08-15 21:17:00, "mingleizhang" <[hidden email]> wrote:
Hi, flink experts!

I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.

// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))

Thanks
mingleizhang



 






 








 

Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

zhangminglei
Hello , Gordon

Thank you very much. And I do save the extra redundant the code. But I have to write the following code as I use a spring boot tech to get the data from elasticsearch. As refers to other codes can delete. If I do not write the json.put("json", "JsonFormat.printToString(element)"), or I will get a map that just belongs to a data source like "key":"value", "key":"value" which is a map by using spring boot tech. And I can not get what I want. 
json.put("json", JsonFormat.printToString(element))
So, When I add the above code, I can get the data by spring boot tech that access Elasticsearch which returns like json -> "key":"value", "key":"value". In this time, I can get the Protobuf object represented by a  json string. Then I can do what I want! 

But the below three line of code can be removed from my project really.
  json.put("activityInfo", element)
json.put("mid", element.getMid)
json.put("activity", element.getActivity)
Peace.
zhangminglei


At 2017-08-18 12:25:22, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi,

I see what you were asking about now.

Yes, it doesn’t make sense to sink an object to Elasticsearch. You either need to transform the object to a JSON using libraries like Protobuf / Jackson / etc., or disintegrate it yourself into a Map.

One thing I noticed is:
json.put("json", JsonFormat.printToString(element))
If what you want is all fields in this object to be able to be queried in Elasticsearch, what you could do is simply:

`Requests.indexRequest().index(...).type(…).source(<the JSON string>)`

This would work fine, and you can also save the extra redundant layer in your sinked JSON.

On 17 August 2017 at 11:23:15 AM, mingleizhang ([hidden email]) wrote:

Ahhh. Sorry Ted. I didnt see the code was broken. Yep, I will directly put the text code here. 

Dependency is 
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.2</version>
</dependency>
And the adding code is like following. This time, although I sink an object to Elasticsearch, I convert it to a JSON by JsonFormat.printToString(element). And it can solve my issue as I can get my data from Elasticsearch represent as a json string, then I can use it to show my data as a front end. 
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, AnyRef]
json.put("activityInfo", element)
json.put("mid", element.getMid)
json.put("activity", element.getActivity)
json.put("json", JsonFormat.printToString(element))
Requests.indexRequest().index("filter_event_tracking").`type`("my-type-2").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))
Peace.
zhangminglei / mingleizhang


At 2017-08-17 09:21:47, "Ted Yu" <[hidden email]> wrote:
Did you use image for the code ?
Can you send plain code again ?
Cheers

-------- Original message --------
From: mingleizhang <[hidden email]>
Date: 8/16/17 6:16 PM (GMT-08:00)
To: mingleizhang <[hidden email]>
Cc: "Tzu-Li (Gordon) Tai" <[hidden email]>, [hidden email]
Subject: Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

I solved the issue by adding a dependency that convert the protobuf objects into a JSON. By adding a line of code like below:  element is a PB object.


Thanks.
zhangminglei



At 2017-08-16 22:52:30, "mingleizhang" <[hidden email]> wrote:
I looked into the sinked data which in ElasticSearch. Good news I can found it is really right there. But but, I sinked the data is an object. But the Elasticsearch represent it as a string. I put the related code below.

element type is an ActivityInfo. then, I wrote a java api to read the data. the value is a string instead. I want it represented as an object of ActivityInfo. But it didnt do like what i want.

Can anybody give me some advice for it ? Thank you very much!





Thanks
zhangminglei / mingleizhang



At 2017-08-16 20:52:34, "mingleizhang" <[hidden email]> wrote:

Hi, Gordon.

      I am not sure about this, as far as I know. ElasticSearch often store JSON data inside it as it is convenient to create it's index. As refers to my code below, I stored the protobuf objects (ActivityInfo which build from activityinfo.proto file) in ElasticSearch. And it is a binary data stored in it. It is very strange I feel. Flink document just give an example for it's data which type belongs to a string as JSON.

Peace,
Zhangminglei




At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:
Hi,

I couldn’t spot anything off in the code snippet you provided. So you should be ok with this :)

Cheers,
Gordon


On 15 August 2017 at 9:18:59 PM, mingleizhang ([hidden email]) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's value setted to itself.




At 2017-08-15 21:17:00, "mingleizhang" <[hidden email]> wrote:
Hi, flink experts!

I sinked my data ( PB objects ) to elasticsearch. I dont know whether the sinked data is correct or incorrect. The codes like following, Could you help me check it please ? Im not familar with ES. Now, I want to install a kibana to view my data. But I dont know the below codes is correct or incorrect. I ran the flink program. it does not give me an error. I just want to confirm.

// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new ElasticsearchSinkFunction[ActivityInfo] {
def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, ActivityInfo]
json.put("data", element)
Requests.indexRequest().index("filter-index-s").`type`("my-type").source(json)
}
override def process(activityInfo: ActivityInfo, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
}
}))

Thanks
mingleizhang