flink sql about nested json

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql about nested json

sanmutongzi
Hi,
i am trying the flink sql api to read json formate data from kafka topic.
My json schema is a nested json like this
{
  "type": "object",
  "properties": {
    "table": {
      "type": "string"
    },
    "str2": {
      "type": "string"
    },
    "obj1": {
      "type": "object",
      "properties": {
        "rkey": {
          "type": "string"
        },
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "rkey", "val"]
    },
    "obj2": {
      "type": "object",
      "properties": {
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "val"]
    }
  },
  "required": ["table", "str2", "obj1", "obj2"]
}

i define a table sechema like this.

Schema schemaDesc1 = new Schema()
        .......
        .field("tablestr", Types.STRING).from("table")
        .......
        .field("rkey", Types.STRING).from("rkey");


when i run a debug case ,i got error about the "rkey" field (the file in the nest obj1)
" SQL validation failed. Table field 'rkey' was resolved to TableSource return type field 'rkey', but field 'rkey' was not found in the return type Row".

My question is :does the org.apache.flink.table.descriptors.Json format support nested json schema? If does ,how can i set the right format or schema ? If not ,then how can i apply flink sql api on nested json data source.
Reply | Threaded
Open this post in threaded view
|

Re: flink sql about nested json

Timo Walther
Hi,

Flink SQL JSON format supports nested formats like the schema that you
posted. Maybe the renaming with `from()` works not as expected. Did you
try it without the `from()` where schema fields are equal to JSON fields?

Alternatively, you could also define the schema only and use the
`deriveSchema()` mode of the format.

Btw there is a big bug in the JSON format that could affect how rows are
parsed (https://issues.apache.org/jira/browse/FLINK-11727).

Maybe it is worth it to write your own format and perform the JSON
parsing logic how you would like it.

Regards,
Timo

Am 04.03.19 um 08:38 schrieb 杨光:

> Hi,
> i am trying the flink sql api to read json formate data from kafka topic.
> My json schema is a nested json like this
> {
>   "type": "object",
>   "properties": {
>     "table": {
>       "type": "string"
>     },
>     "str2": {
>       "type": "string"
>     },
>     "obj1": {
>       "type": "object",
>       "properties": {
>         "rkey": {
>           "type": "string"
>         },
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "rkey", "val"]
>     },
>     "obj2": {
>       "type": "object",
>       "properties": {
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "val"]
>     }
>   },
>   "required": ["table", "str2", "obj1", "obj2"]
> }
>
> i define a table sechema like this.
>
> Schema schemaDesc1 = new Schema()
>         .......
>         .field("tablestr", Types.STRING).from("table")
>         .......
>         .field("rkey", Types.STRING).from("rkey");
>
>
> when i run a debug case ,i got error about the "rkey" field (the file
> in the nest obj1)
> " SQL validation failed. Table field 'rkey' was resolved to
> TableSource return type field 'rkey', but field 'rkey' was not found
> in the return type Row".
>
> My question is :does the org.apache.flink.table.descriptors.Json
> format support nested json schema? If does ,how can i set the right
> format or schema ? If not ,then how can i apply flink sql api on
> nested json data source.


Reply | Threaded
Open this post in threaded view
|

Re: flink sql about nested json

sanmutongzi
HI Timo
I have get the nested value by change the Schema definition like this 
  Schema schemaDesc1 = new Schema()
            .field("str2", Types.STRING)
        .field("tablestr", Types.STRING).from("table")
        .field("obj1", Types.ROW_NAMED(new String[]{"rkey","val","lastTime"}, Types.STRING,Types.STRING,Types.BIG_DEC));

  and the sql like this

  SELECT tablestr, obj1.rkey  from mytable ...

So it looks like is not the json parser format problem , it's about how to define nested Schema and select them  in the sql.
Are there any documents i can learn more detail about this?
Thanks!

Timo Walther <[hidden email]> 于2019年3月5日周二 上午12:13写道:
Hi,

Flink SQL JSON format supports nested formats like the schema that you
posted. Maybe the renaming with `from()` works not as expected. Did you
try it without the `from()` where schema fields are equal to JSON fields?

Alternatively, you could also define the schema only and use the
`deriveSchema()` mode of the format.

Btw there is a big bug in the JSON format that could affect how rows are
parsed (https://issues.apache.org/jira/browse/FLINK-11727).

Maybe it is worth it to write your own format and perform the JSON
parsing logic how you would like it.

Regards,
Timo

Am 04.03.19 um 08:38 schrieb 杨光:
> Hi,
> i am trying the flink sql api to read json formate data from kafka topic.
> My json schema is a nested json like this
> {
>   "type": "object",
>   "properties": {
>     "table": {
>       "type": "string"
>     },
>     "str2": {
>       "type": "string"
>     },
>     "obj1": {
>       "type": "object",
>       "properties": {
>         "rkey": {
>           "type": "string"
>         },
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "rkey", "val"]
>     },
>     "obj2": {
>       "type": "object",
>       "properties": {
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "val"]
>     }
>   },
>   "required": ["table", "str2", "obj1", "obj2"]
> }
>
> i define a table sechema like this.
>
> Schema schemaDesc1 = new Schema()
>         .......
>         .field("tablestr", Types.STRING).from("table")
>         .......
>         .field("rkey", Types.STRING).from("rkey");
>
>
> when i run a debug case ,i got error about the "rkey" field (the file
> in the nest obj1)
> " SQL validation failed. Table field 'rkey' was resolved to
> TableSource return type field 'rkey', but field 'rkey' was not found
> in the return type Row".
>
> My question is :does the org.apache.flink.table.descriptors.Json
> format support nested json schema? If does ,how can i set the right
> format or schema ? If not ,then how can i apply flink sql api on
> nested json data source.


Reply | Threaded
Open this post in threaded view
|

Re: flink sql about nested json

sanmutongzi
In reply to this post by Timo Walther

杨光 <[hidden email]>

下午3:22 (1分钟前)
发送至 Timo、 user
HI Timo
I have get the nested value by change the Schema definition like this 
  Schema schemaDesc1 = new Schema()
            .field("str2", Types.STRING)
        .field("tablestr", Types.STRING).from("table")
        .field("obj1", Types.ROW_NAMED(new String[]{"rkey","val","lastTime"}, Types.STRING,Types.STRING,Types.BIG_DEC));

  and the sql like this

  SELECT tablestr, obj1.rkey  from mytable ...

So it looks like is not the json parser format problem , it's about how to define nested Schema and select them  in the sql.
Are there any documents i can learn more detail about this?
Thanks!

Timo Walther <[hidden email]> 于2019年3月5日周二 上午12:13写道:
Hi,

Flink SQL JSON format supports nested formats like the schema that you
posted. Maybe the renaming with `from()` works not as expected. Did you
try it without the `from()` where schema fields are equal to JSON fields?

Alternatively, you could also define the schema only and use the
`deriveSchema()` mode of the format.

Btw there is a big bug in the JSON format that could affect how rows are
parsed (https://issues.apache.org/jira/browse/FLINK-11727).

Maybe it is worth it to write your own format and perform the JSON
parsing logic how you would like it.

Regards,
Timo

Am 04.03.19 um 08:38 schrieb 杨光:
> Hi,
> i am trying the flink sql api to read json formate data from kafka topic.
> My json schema is a nested json like this
> {
>   "type": "object",
>   "properties": {
>     "table": {
>       "type": "string"
>     },
>     "str2": {
>       "type": "string"
>     },
>     "obj1": {
>       "type": "object",
>       "properties": {
>         "rkey": {
>           "type": "string"
>         },
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "rkey", "val"]
>     },
>     "obj2": {
>       "type": "object",
>       "properties": {
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "val"]
>     }
>   },
>   "required": ["table", "str2", "obj1", "obj2"]
> }
>
> i define a table sechema like this.
>
> Schema schemaDesc1 = new Schema()
>         .......
>         .field("tablestr", Types.STRING).from("table")
>         .......
>         .field("rkey", Types.STRING).from("rkey");
>
>
> when i run a debug case ,i got error about the "rkey" field (the file
> in the nest obj1)
> " SQL validation failed. Table field 'rkey' was resolved to
> TableSource return type field 'rkey', but field 'rkey' was not found
> in the return type Row".
>
> My question is :does the org.apache.flink.table.descriptors.Json
> format support nested json schema? If does ,how can i set the right
> format or schema ? If not ,then how can i apply flink sql api on
> nested json data source.