Can a Flink query outputs nested json?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Can a Flink query outputs nested json?

srikanth flink
I'm working on Flink SQL client. Input data is json format and contains nested json.

I'm trying to query the nested json from the table and expecting the output to be nested json instead of string.

I've build the environment file to define a table schema as:
format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'lon': {
              type: 'string'
            },
            'rideTime': {
              type: 'string'
            },
            'nested': {
              type: 'object',
              properties: {
                'inner': {
                  type: 'string'
                },
                'nested1': {
                  type: 'object',
                  properties: {
                    'inner1': {
                      type: 'string'
                    }
                  }
                }
              }
            },
            'name': {
              type: 'string'
            }
          }
        }
      derive-schema: false
    schema:
      - name: 'lon'
        type: VARCHAR
      - name: 'rideTime'
        type: VARCHAR
      - name: 'nested'
        type: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>
      - name: 'name'
        type: VARCHAR

Sink table schema:
format:
      type: json
      fail-on-missing-field: false
      derive-schema: true
    schema:
      - name: 'nested'
        type: ROW<`inner` STRING>


Queries Been trying the following queries
Flink SQL> insert into nestedSink select nested.`inner` as `nested.inner` from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute
SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [nestedSink] do not match.
Query result schema: [nested.inner: String]
TableSink schema:    [nested: Row]

Flink SQL> insert into nestedSink select nested from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [nestedSink] do not match.
Query result schema: [nested: Row]
TableSink schema:    [nested: Row]

Flink SQL> insert into nestedSink select nested.`inner` as nested.`inner` from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1, column 55.
Was expecting one of:
    <EOF>
    "EXCEPT" ...
    "FETCH" ...
    "FROM" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "UNION" ...
    "," ...

Help me understand the problem with my schema/query?
Also would like to add new columns and nested colums.

Thanks
Srikanth



Reply | Threaded
Open this post in threaded view
|

Re: Can a Flink query outputs nested json?

Fabian Hueske-2
Hi,

I did not understand what you are trying to achieve.
Which field of the input table do you want to write to the output table?

Flink SQL> insert into nestedSink select nested from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [nestedSink] do not match.
Query result schema: [nested: Row]
TableSink schema:    [nested: Row]

This does not work, because the nested schema of the query result and sink are not identical.
The table sink has only one nested String field called inner. The query result looks like this: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>

You need to make sure that the schema of query result and sink table are exactly the same. Otherwise, Flink will throw those ValidationExceptions.

Best, Fabian

Am Do., 24. Okt. 2019 um 12:24 Uhr schrieb srikanth flink <[hidden email]>:
I'm working on Flink SQL client. Input data is json format and contains nested json.

I'm trying to query the nested json from the table and expecting the output to be nested json instead of string.

I've build the environment file to define a table schema as:
format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'lon': {
              type: 'string'
            },
            'rideTime': {
              type: 'string'
            },
            'nested': {
              type: 'object',
              properties: {
                'inner': {
                  type: 'string'
                },
                'nested1': {
                  type: 'object',
                  properties: {
                    'inner1': {
                      type: 'string'
                    }
                  }
                }
              }
            },
            'name': {
              type: 'string'
            }
          }
        }
      derive-schema: false
    schema:
      - name: 'lon'
        type: VARCHAR
      - name: 'rideTime'
        type: VARCHAR
      - name: 'nested'
        type: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>
      - name: 'name'
        type: VARCHAR

Sink table schema:
format:
      type: json
      fail-on-missing-field: false
      derive-schema: true
    schema:
      - name: 'nested'
        type: ROW<`inner` STRING>


Queries Been trying the following queries
Flink SQL> insert into nestedSink select nested.`inner` as `nested.inner` from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute
SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [nestedSink] do not match.
Query result schema: [nested.inner: String]
TableSink schema:    [nested: Row]

Flink SQL> insert into nestedSink select nested from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [nestedSink] do not match.
Query result schema: [nested: Row]
TableSink schema:    [nested: Row]

Flink SQL> insert into nestedSink select nested.`inner` as nested.`inner` from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1, column 55.
Was expecting one of:
    <EOF>
    "EXCEPT" ...
    "FETCH" ...
    "FROM" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "UNION" ...
    "," ...

Help me understand the problem with my schema/query?
Also would like to add new columns and nested colums.

Thanks
Srikanth