Table API and nested JSON

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Table API and nested JSON

Pramit Vamsi
Hi,

I am working with nested JSON e.g.

{
"document": {
"_id": "qwery",
"meetingstatus": 3,
"city": 100,
"users": {
"created": "5c9243033eee61a14e5b",
"assigned": "5c9496ad1e91f10f44f"
}
},
"operation": "update"
}

Code usage:
tableEnv.connect(new Kafka()
                .version("0.11")
                .topic(".....")
                .property("bootstrap.servers", bootStrapServer)
                .startFromGroupOffsets())
                .withSchema(new Schema()
                        .field("op", Types.STRING())
                        .from("operation")
                        .field("agent", Types.INT())
                        .from("document.user.created")
                        .field("ts", Types.SQL_TIMESTAMP())
                        .rowtime(new Rowtime()
                                .timestampsFromField("document.updatedAt")
                                .watermarksPeriodicBounded(2000)
                        )
                )
.withFormat(new Json().failOnMissingField(false)
                .jsonSchema("{\n" +
                "  \"definitions\": {},\n" +
                "  \"$schema\": \"<a href="http://json-schema.org/draft-07/schema#\">http://json-schema.org/draft-07/schema#\",\n" + 
......
)

Seeing this exception:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Table field 'agent' was resolved to TableSource return type field 'document.users.created', but field 'document.users.created' was not found in the return type Row(document: Row(_id: String, meetingstatus: BigDecimal, city: BigDecimal, users: Row(created: String, assigned: String), ...), operation: String) of the TableSource. Please verify the field mapping of the TableSource.

How should I address the nested properties and use it in .from() ?

Thanks,
Pramit
Reply | Threaded
Open this post in threaded view
|

Re: Table API and nested JSON

Pramit Vamsi
I looked at the implementation in TableSourceUtil's resolveInputField method. It seems like the fieldName lookup happens at the json top level and does not look at nested structure.

The returnType references RowTypeInfo which reflects the correct nested schema.

Any pointers here?

On Mon, Jun 3, 2019 at 12:02 AM Pramit Vamsi <[hidden email]> wrote:
Hi,

I am working with nested JSON e.g.

{
"document": {
"_id": "qwery",
"meetingstatus": 3,
"city": 100,
"users": {
"created": "5c9243033eee61a14e5b",
"assigned": "5c9496ad1e91f10f44f"
}
},
"operation": "update"
}

Code usage:
tableEnv.connect(new Kafka()
                .version("0.11")
                .topic(".....")
                .property("bootstrap.servers", bootStrapServer)
                .startFromGroupOffsets())
                .withSchema(new Schema()
                        .field("op", Types.STRING())
                        .from("operation")
                        .field("agent", Types.INT())
                        .from("document.user.created")
                        .field("ts", Types.SQL_TIMESTAMP())
                        .rowtime(new Rowtime()
                                .timestampsFromField("document.updatedAt")
                                .watermarksPeriodicBounded(2000)
                        )
                )
.withFormat(new Json().failOnMissingField(false)
                .jsonSchema("{\n" +
                "  \"definitions\": {},\n" +
                "  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n" + 
......
)

Seeing this exception:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Table field 'agent' was resolved to TableSource return type field 'document.users.created', but field 'document.users.created' was not found in the return type Row(document: Row(_id: String, meetingstatus: BigDecimal, city: BigDecimal, users: Row(created: String, assigned: String), ...), operation: String) of the TableSource. Please verify the field mapping of the TableSource.

How should I address the nested properties and use it in .from() ?

Thanks,
Pramit