The TypeInformation about Table API&SQL

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

The TypeInformation about Table API&SQL

韩宁宁
Dear All,
     My name is Han. I'm very interested in your advanced Flink system, and I'm learning it.
     I'm writing to your group for communicating about my personal question. I tried to use Table API&SQL and register a TableSource by the KafkaJsonTableSource method, I have to say it works very well. My code as follows.
And my question is about TypeInformation. The first parameter returns an array of String types and it contains fields in the kafka, the second parameter corresponds to the type of each field,i.e the below code,new String[]{"a","b","c"} is the the first parameter,new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()} is the second parameter.
If I didn't know the name of the field in the kafka before registering TableSource, in other words,
the fields in the kafka are dynamic, how to solve this problem?
TypeInformation<Row> typeInfo = Types.ROW(
new String[]{"a","b","c"},
new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()}
);
KafkaJsonTableSource kafkaTableSource = new Kafka010JsonTableSource(eltResultTopic,
inputProperties, typeInfo);
kafkaTableSource.setFailOnMissingField(false);

tableEnvironment.registerTableSource("kafkaSource",kafkaTableSource);
Table sqlResult = tableEnvironment.sql("select a from kafkaSource");
DataStream<String> dataStream = tableEnvironment.toAppendStream(sqlResult,String.class);
dataStream.print();

environment.execute();
Waiting for your earlier reply. Thanks.
Best Wishes,
Han






Reply | Threaded
Open this post in threaded view
|

Re: The TypeInformation about Table API&SQL

Timo Walther
Hi Han,

generally, Flink is a strongly typed system. I think the easiest way to handle a dynamic schema is to read your JSON as a String. You can then implement your own ScalarFunction (or in this case also a TableFunction) [1] and use any JSON parsing library in this function for preprocessing to a common representation.

I hope this helps.

Regards,
Timo


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html


Am 10/23/17 um 5:16 AM schrieb 韩宁宁:
Dear All,
     My name is Han. I'm very interested in your advanced Flink system, and I'm learning it.
     I'm writing to your group for communicating about my personal question. I tried to use Table API&SQL and register a TableSource by the KafkaJsonTableSource method, I have to say it works very well. My code as follows.
And my question is about TypeInformation. The first parameter returns an array of String types and it contains fields in the kafka, the second parameter corresponds to the type of each field,i.e the below code,new String[]{"a","b","c"} is the the first parameter,new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()} is the second parameter.
If I didn't know the name of the field in the kafka before registering TableSource, in other words,
the fields in the kafka are dynamic, how to solve this problem?
TypeInformation<Row> typeInfo = Types.ROW(
        new String[]{"a","b","c"},
        new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()}
);
KafkaJsonTableSource kafkaTableSource = new Kafka010JsonTableSource(eltResultTopic,
        inputProperties, typeInfo);
kafkaTableSource.setFailOnMissingField(false);

tableEnvironment.registerTableSource("kafkaSource",kafkaTableSource);
Table sqlResult = tableEnvironment.sql("select a from kafkaSource");
DataStream<String> dataStream = tableEnvironment.toAppendStream(sqlResult,String.class);
dataStream.print();

environment.execute();
Waiting for your earlier reply. Thanks.
Best Wishes,
Han