JDBC connector support for JSON

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

JDBC connector support for JSON

Fanbin Bu
Hi,

For a streaming job that uses Kafka connector, this doc https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#format-options shows that we can parse json data format. However, it does not seem like Flink JDBC connector support json data type, at least from this doc https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping

So the question is, does JDBC connector also have this capability? if not, what's required to enable it. At the end of the day, I would like to see something like this:

create table aTable(field1 type, jsonField1 ROW<field2 type, jsonField2 ROW<field3 type> >) 
with 
(
'connector' = 'jdbc',
'url' = '...',
'table-name' = 'my-table-with-json-column',
...
)

tEnv.executeSql("select jsonField1.jsonField2.field3 from aTable")

Thanks,
Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: JDBC connector support for JSON

Matthias
Hi Fanbin,
I'm not that familiar with the FlinkSQL features. But it looks like the JdbcConnector does not support Json as stated in the documentation [1]. You might work around it by implementing your own user-defined functions [2].

I hope this helps.
Matthias


On Wed, Mar 31, 2021 at 7:04 AM Fanbin Bu <[hidden email]> wrote:
Hi,

For a streaming job that uses Kafka connector, this doc https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#format-options shows that we can parse json data format. However, it does not seem like Flink JDBC connector support json data type, at least from this doc https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping

So the question is, does JDBC connector also have this capability? if not, what's required to enable it. At the end of the day, I would like to see something like this:

create table aTable(field1 type, jsonField1 ROW<field2 type, jsonField2 ROW<field3 type> >) 
with 
(
'connector' = 'jdbc',
'url' = '...',
'table-name' = 'my-table-with-json-column',
...
)

tEnv.executeSql("select jsonField1.jsonField2.field3 from aTable")

Thanks,
Fanbin