Querying nested JSON stream?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Querying nested JSON stream?

srikanth flink
Hi there,

I'm using Flink SQL clinet to run the jobs for me. My stream is a JSON with nested objects. Couldn't find much document on querying the nested JSON, so I had to flatten the JSON and use as:
SELECT `source.ip`, `destination.ip`, `dns.query`, `organization.id`, `dns.answers.data` FROM source;

Can someone help me with the query, querying nested JSON so I could save resources running flattening job?


Thanks
Srikanth
Reply | Threaded
Open this post in threaded view
|

Re: Querying nested JSON stream?

Dawid Wysakowicz-2

Hi Srikanth,

Flink SQL supports nested objects, therefore you should not need to run a separate flattening job. If you are using Kafka as a source for your stream it should be fairly easy. You just need to define a proper json schema for your stream as in this example[1][2]. If you use a different source for your events it might be a bit more involving but still you can reuse the `JsonRowDeserializationSchema` (Unfortunately Kafka is the only table source that supports formats out of the box). E.g. have a look at a simple example, where I use a collection of strings as an input:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<byte[]> input = env.fromElements(
            "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 1234}}".getBytes()
        );

        JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder("{" +
            "  type: 'object'," +
            "  properties: {" +
            "    lon: {" +
            "      type: 'number'" +
            "    }," +
            "    rideTime: {" +
            "      type: 'string'" +
            "    }," +
            "    obj: {" +
            "      type: 'object'," +
            "      properties: {" +
            "        numb: {" +
            "          type: 'number'" +
            "        }" +
            "      }" +
            "    }" +
            "  }" +
            "}").build();

        TypeInformation<Row> producedType = jsonSchema.getProducedType();
        SingleOutputStreamOperator<Row> in = input.map(jsonSchema::deserialize)
            .returns(producedType);

        tEnv.registerDataStream("t", in);

	Table table = tEnv.sqlQuery("SELECT obj.numb FROM t"); // you can query nested fields

Hope that helps.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#json-format

On 17/10/2019 15:58, srikanth flink wrote:
Hi there,

I'm using Flink SQL clinet to run the jobs for me. My stream is a JSON with nested objects. Couldn't find much document on querying the nested JSON, so I had to flatten the JSON and use as:
SELECT `source.ip`, `destination.ip`, `dns.query`, `organization.id`, `dns.answers.data` FROM source;

Can someone help me with the query, querying nested JSON so I could save resources running flattening job?


Thanks
Srikanth

signature.asc (849 bytes) Download Attachment