TABLE API + DataStream outsourcing schema or Pojo?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

TABLE API + DataStream outsourcing schema or Pojo?

Steve Robert-2
Hi guys , 

It's been a while since I'm studying TABLE APIs for integration into my system. 
when i take a look on this documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors


I understand that it is possible to apply a JSON FORMAT on the connector and apply a JSON-SCHEMA without any hardcoded  java pojo
.jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )


    but my problematic is the following my data comes from REST-API , so I have to process the data and transmit it via a DataStream
    the problem is that between the conversation of a dataStream and a table must pass through a Java Pojo. Datastream<YourPojo>  input....    Table table=tEnv.fromDataStream(input);
    I tried a trick while making a conversation from my JSON to AVRO using a GenericRecord but it does not seem possible .

    my user case and being able to add REST-API processing  in runtime and be able to outsource and dynamically load my Pojo / Schema without harcode an Java-Pojo object 


Do you have an approach to suggest me ? 


Thank a lot 
Reply | Threaded
Open this post in threaded view
|

Re: TABLE API + DataStream outsourcing schema or Pojo?

Fabian Hueske-2
Hi Steve,

Maybe you could implement a custom TableSource that queries the data from the rest API and converts the JSON directly into a Row data type.
This would also avoid going through the DataStream API just for ingesting the data.

Best, Fabian

Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <[hidden email]>:
Hi guys , 

It's been a while since I'm studying TABLE APIs for integration into my system. 
when i take a look on this documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors


I understand that it is possible to apply a JSON FORMAT on the connector and apply a JSON-SCHEMA without any hardcoded  java pojo
.jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )


    but my problematic is the following my data comes from REST-API , so I have to process the data and transmit it via a DataStream
    the problem is that between the conversation of a dataStream and a table must pass through a Java Pojo. Datastream<YourPojo>  input....    Table table=tEnv.fromDataStream(input);
    I tried a trick while making a conversation from my JSON to AVRO using a GenericRecord but it does not seem possible .

    my user case and being able to add REST-API processing  in runtime and be able to outsource and dynamically load my Pojo / Schema without harcode an Java-Pojo object 


Do you have an approach to suggest me ? 


Thank a lot 
Reply | Threaded
Open this post in threaded view
|

Re: TABLE API + DataStream outsourcing schema or Pojo?

Steve Robert-2
Hi Fabian , 

thank you for your answer it is indeed the solution that I am currently testing
i use TypeInformation<Row> convert = JsonRowSchemaConverter.convert(JSON_SCHEMA);     provided by the flink-json  and provide the TypeFormation to the operatorStream 
its look like to work :) with this solution my schema can be outside my package 

one additional question about .  GenericMemoryCatalog  https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html . 

catalog can be use accross multiple job running on the same cluster ? or the catalog are  scoped on the job session only ?

DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String s) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(s);
return node;
}
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
@Override
public Row map(JsonNode jsonNode) throws Exception {
int pos = 0;
Row row = new Row(jsonNode.size());
Iterator<String> iterator = jsonNode.fieldNames();
while (iterator.hasNext()) {
String key = iterator.next();
row.setField(pos, jsonNode.get(key).asText());
pos++;
}
return row;
}
}).returns(convert);
Table tableA = tEnv.fromDataStream(dataStreamRow);

Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <[hidden email]> a écrit :
Hi Steve,

Maybe you could implement a custom TableSource that queries the data from the rest API and converts the JSON directly into a Row data type.
This would also avoid going through the DataStream API just for ingesting the data.

Best, Fabian

Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <[hidden email]>:
Hi guys , 

It's been a while since I'm studying TABLE APIs for integration into my system. 
when i take a look on this documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors


I understand that it is possible to apply a JSON FORMAT on the connector and apply a JSON-SCHEMA without any hardcoded  java pojo
.jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )


    but my problematic is the following my data comes from REST-API , so I have to process the data and transmit it via a DataStream
    the problem is that between the conversation of a dataStream and a table must pass through a Java Pojo. Datastream<YourPojo>  input....    Table table=tEnv.fromDataStream(input);
    I tried a trick while making a conversation from my JSON to AVRO using a GenericRecord but it does not seem possible .

    my user case and being able to add REST-API processing  in runtime and be able to outsource and dynamically load my Pojo / Schema without harcode an Java-Pojo object 


Do you have an approach to suggest me ? 


Thank a lot 
Reply | Threaded
Open this post in threaded view
|

Re: TABLE API + DataStream outsourcing schema or Pojo?

Fabian Hueske-2
Hi Steve,

The memory catalog does not persist metadata and needs to be repopulated everytime.
However, you can implement a catalog that persists the metadata to a file or a database.

There is an effort to implement a Catalog interface of Hive's metastore.
A preview is available in the latest release (1.9.0)

Best, Fabian

Am Do., 5. Sept. 2019 um 14:52 Uhr schrieb Steve Robert <[hidden email]>:
Hi Fabian , 

thank you for your answer it is indeed the solution that I am currently testing
i use TypeInformation<Row> convert = JsonRowSchemaConverter.convert(JSON_SCHEMA);     provided by the flink-json  and provide the TypeFormation to the operatorStream 
its look like to work :) with this solution my schema can be outside my package 

one additional question about .  GenericMemoryCatalog  https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html . 

catalog can be use accross multiple job running on the same cluster ? or the catalog are  scoped on the job session only ?

DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String s) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(s);
return node;
}
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
@Override
public Row map(JsonNode jsonNode) throws Exception {
int pos = 0;
Row row = new Row(jsonNode.size());
Iterator<String> iterator = jsonNode.fieldNames();
while (iterator.hasNext()) {
String key = iterator.next();
row.setField(pos, jsonNode.get(key).asText());
pos++;
}
return row;
}
}).returns(convert);
Table tableA = tEnv.fromDataStream(dataStreamRow);

Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <[hidden email]> a écrit :
Hi Steve,

Maybe you could implement a custom TableSource that queries the data from the rest API and converts the JSON directly into a Row data type.
This would also avoid going through the DataStream API just for ingesting the data.

Best, Fabian

Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <[hidden email]>:
Hi guys , 

It's been a while since I'm studying TABLE APIs for integration into my system. 
when i take a look on this documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors


I understand that it is possible to apply a JSON FORMAT on the connector and apply a JSON-SCHEMA without any hardcoded  java pojo
.jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )


    but my problematic is the following my data comes from REST-API , so I have to process the data and transmit it via a DataStream
    the problem is that between the conversation of a dataStream and a table must pass through a Java Pojo. Datastream<YourPojo>  input....    Table table=tEnv.fromDataStream(input);
    I tried a trick while making a conversation from my JSON to AVRO using a GenericRecord but it does not seem possible .

    my user case and being able to add REST-API processing  in runtime and be able to outsource and dynamically load my Pojo / Schema without harcode an Java-Pojo object 


Do you have an approach to suggest me ? 


Thank a lot