Hi there, I'm using Flink SQL clinet to run the jobs for me. My stream is a JSON with nested objects. Couldn't find much document on querying the nested JSON, so I had to flatten the JSON and use as: SELECT `source.ip`, `destination.ip`, `dns.query`, `organization.id`, `dns.answers.data` FROM source; Can someone help me with the query, querying nested JSON so I could save resources running flattening job? Thanks Srikanth |
Hi Srikanth, Flink SQL supports nested objects, therefore you should not need to run a separate flattening job. If you are using Kafka as a source for your stream it should be fairly easy. You just need to define a proper json schema for your stream as in this example[1][2]. If you use a different source for your events it might be a bit more involving but still you can reuse the `JsonRowDeserializationSchema` (Unfortunately Kafka is the only table source that supports formats out of the box). E.g. have a look at a simple example, where I use a collection of strings as an input: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<byte[]> input = env.fromElements( "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 1234}}".getBytes() ); JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder("{" + " type: 'object'," + " properties: {" + " lon: {" + " type: 'number'" + " }," + " rideTime: {" + " type: 'string'" + " }," + " obj: {" + " type: 'object'," + " properties: {" + " numb: {" + " type: 'number'" + " }" + " }" + " }" + " }" + "}").build(); TypeInformation<Row> producedType = jsonSchema.getProducedType(); SingleOutputStreamOperator<Row> in = input.map(jsonSchema::deserialize) .returns(producedType); tEnv.registerDataStream("t", in); Table table = tEnv.sqlQuery("SELECT obj.numb FROM t"); // you can query nested fields Hope that helps. Best, Dawid [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#json-format On 17/10/2019 15:58, srikanth flink
wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |