Hi !
I want to use Flink SQL to process some json events. It is quite challenging to define a schema for the Flink SQL table. My data source's format is some json like this { "top_level_key1": "some value", "nested_object": { "nested_key1": "abc", "nested_key2": 123, "nested_key3": ["element1", "element2", "element3"] } } The big challenges for me to define a schema for the data source are 1. the keys in nested_object are flexible, there might be 3 unique keys or more unique keys. If I enumerate all the keys in the schema, I think my code is fragile, how to handle event which contains more nested_keys in nested_object ? 2. I know table api support Map type, but I am not sure if I can put generic object as the value of the map. Because the values in nested_object are of different types, some of them are int, some of them are string or array. So. how to expose this kind of json data as table in Flink SQL without enumerating all the nested_keys? Thanks. Guodong |
Hi Guodong, I think you almost get the answer, 1. map type, it's not working for current implementation. For example, use map<varchar, varchar>, if the value if non-string json object, then `JsonNode.asText()` may not work as you wish. 2. list all fields you cares. IMO, this can fit your scenario. And you can set format.fail-on-missing-field = true, to allow setting non-existed fields to be null. For 1, I think maybe we can support it in the future, and I've created jira[1] to track this. Guodong Wang <[hidden email]> 于2020年5月28日周四 下午6:32写道:
Best, Benchao Li |
Benchao, Thank you for your quick reply. As you mentioned, for current scenario, approach 2 should work for me. But it is a little bit annoying that I have to modify schema to add new field types when upstream app changes the json format or adds new fields. Otherwise, my user can not refer the field in their SQL. Per description in the jira, I think after implementing this, all the json values will be converted as strings. I am wondering if Flink SQL can/will support the flexible schema in the future, for example, register the table without defining specific schema for each field, to let user define a generic map or array for one field. but the value of map/array can be any object. Then, the type conversion cost might be saved. Guodong On Thu, May 28, 2020 at 7:43 PM Benchao Li <[hidden email]> wrote:
|
Hi, guodong
It’s an interesting topic, this feature is more close to the scope of schema inference. The schema inference should come in next few releases. Best, Leonard Xu
|
In reply to this post by Guodong Wang
Hi Guodong, Does the RAW type meet your requirements? For example, you can specify map<varchar, raw> type, and the value for the map is the raw JsonNode parsed from Jackson. This is not supported yet, however IMO this could be supported. Guodong Wang <[hidden email]> 于2020年5月28日周四 下午9:43写道:
Best, Benchao Li |
Yes. Setting the value type as raw is one possible approach. And I would like to vote for schema inference as well. Correct me if I am wrong, IMO schema inference means I can provide a method in the table source to infer the data schema base on the runtime computation. Just like some calcite adaptor does. Right? For SQL table registration, I think that requiring the table source to provide a static schema might be too strict. Let planner to infer the table schema will be more flexible. Thank you for your suggestions. Guodong On Thu, May 28, 2020 at 11:11 PM Benchao Li <[hidden email]> wrote:
|
Hi Guodong, After an offline discussion with Leonard. I think you get the right meaning of schema inference. But there are two problems here: 1. schema of the data is fixed, schema inference can save your effort to write the schema explicitly. 2. schema of the data is dynamic, in this case the schema inference cannot help. Because SQL is somewhat static language, which should know all the data types at compile stage. Maybe I've misunderstood your question at the very beginning. I thought your case is #2. If your case is #1, then schema inference is a good choice. Guodong Wang <[hidden email]> 于2020年5月28日周四 下午11:39写道:
Best, Benchao Li |
Benchao, Thank you for your detailed explanation. Schema Inference can solve my problem partially. For example, starting from some time, all the json afterward will contain a new field. I think for this case, schema inference will help. but if I need to handle all the json events with different schemas in one table(this is the case 2), I agree with you. Schema inference does not help either. Guodong On Fri, May 29, 2020 at 11:02 AM Benchao Li <[hidden email]> wrote:
|
Hi all, This is an interesting topic. Schema inference will be the next big feature planned in the next release. I added this thread link into FLINK-16420. I think the case of Guodong is schema evolution, which I think there is something to do with schema inference. I don't have a clear idea for this yet, but some initial thoughts are: 1) schema inference can happen for each query, instead of when creating table. So that, once data schema is evolved, the catalog table can have the new schema. However, this may break existing queries on this catalog table (e.g. SELECT * FROM T). 2) manually create a new table with schema inference, we can use LIKE grammer or SHOW CREATE TABLE to help creating a table based on existing ones. The new table have the new schema because we re-infer schema again. 3) auto-matically create a new tabel with schema inference. This can be done with some catalogs, for example, SchemaRegistryCatalog, once a new avro schema (say schema id = 100) is added to the registry, users can use this new schema with table "mytopic-100". Best, Jark On Fri, 29 May 2020 at 22:05, Guodong Wang <[hidden email]> wrote:
|
Hi Jark, You totally got my point. Actually, the perfect solution in my opinion is to support schema evolution in one query. Although classic SQL needs to know the schema before do any computing, when integrating the nosql data source to flink datastream, if schema evolution is possible, it will save tons of time for user. For example, when I have some json docs in mongodb, I want to expose the collections as tables in flink SQL. But aligning the schema in flink catalog service is not very friendly, I need to remember to update the catalog when I add a new field in my database. Although, it is not easy to validate SQL correctly if there is no schema information about the table, for example "select sum(amount) from my_table group by category", if the amount field is not number, runtime error will be thrown. I think this is another challenge about supporting schema evolution. anyway, I think deferring the errors to runtime is fair when user wants to have schema flexibility. Guodong On Mon, Jun 1, 2020 at 12:29 PM Jark Wu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |