Hi guys ,
It's been a while since I'm studying TABLE APIs for integration into my system. when i take a look on this documentation : https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors I understand that it is possible to apply a JSON FORMAT on the connector and apply a JSON-SCHEMA without any hardcoded java pojo .jsonSchema( "{" + " type: 'object'," + " properties: {" + " lon: {" + " type: 'number'" + " }," + " rideTime: {" + " type: 'string'," + " format: 'date-time'" + " }" + " }" + "}" ) but my problematic is the following my data comes from REST-API , so I have to process the data and transmit it via a DataStream the problem is that between the conversation of a dataStream and a table must pass through a Java Pojo. Datastream<YourPojo> input.... Table table=tEnv.fromDataStream(input); I tried a trick while making a conversation from my JSON to AVRO using a GenericRecord but it does not seem possible . my user case and being able to add REST-API processing in runtime and be able to outsource and dynamically load my Pojo / Schema without harcode an Java-Pojo object Do you have an approach to suggest me ? Thank a lot |
Hi Steve, Maybe you could implement a custom TableSource that queries the data from the rest API and converts the JSON directly into a Row data type. This would also avoid going through the DataStream API just for ingesting the data. Best, Fabian Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <[hidden email]>:
|
Hi Fabian , thank you for your answer it is indeed the solution that I am currently testing i use TypeInformation<Row> convert = JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the flink-json and provide the TypeFormation to the operatorStream its look like to work :) with this solution my schema can be outside my package one additional question about . GenericMemoryCatalog https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html . catalog can be use accross multiple job running on the same cluster ? or the catalog are scoped on the job session only ?DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() { Table tableA = tEnv.fromDataStream(dataStreamRow); Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <[hidden email]> a écrit :
|
Hi Steve, The memory catalog does not persist metadata and needs to be repopulated everytime. However, you can implement a catalog that persists the metadata to a file or a database. There is an effort to implement a Catalog interface of Hive's metastore. A preview is available in the latest release (1.9.0) Best, Fabian Am Do., 5. Sept. 2019 um 14:52 Uhr schrieb Steve Robert <[hidden email]>:
|
Free forum by Nabble | Edit this page |