Accessing pojo fields by name in flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Accessing pojo fields by name in flink

Frank Wilson
Hi,

So far I’ve been developing my flink pipelines using the datastream API. I have a pipeline that calculates windowed statistics on a given pojo field. Ideally I would like this field to be user configurable via a config file. To do this I would need to extract pojo fields by name. The Table API seems to have the facilities to do this. I gather datastreams can be converted to Tables. Is it sensible to convert to tables in the middle of my datastream pipeline to do this or is there a better way?

Thanks,

Frank
Reply | Threaded
Open this post in threaded view
|

Re: Accessing pojo fields by name in flink

Steve Robert-2
Hi Frank , 

I think we have a similar case in my case I should be able to set the pojo from the outside to analyze API-REST. 
my strategy in my case and define my schema by using a JSON to convert JSON data formats to Flink Row

so for example imagine your date is something like this :  public static String json = "{\"firstname\":\"steve\",\"name\":\"flink\"}";
i will define a schema pojo like this : 
public static String JSON_SCHEMA = "{\n" +
" \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"firstname\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"name\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" },\n" +
" \"required\": [\n" +
" \"firstname\",\n" +
" \"name\"\n" +
" ]\n" +
"}";

and create the convertion for your data   like this to Row (if you want use TABLE API) 
TypeInformation<Row> convert = JsonRowSchemaConverter.convert(JSON_SCHEMA);
DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String s) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(s);
return node;
}
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
@Override
public Row map(JsonNode jsonNode) throws Exception {
int pos = 0;
Row row = new Row(jsonNode.size());
Iterator<String> iterator = jsonNode.fieldNames();
while (iterator.hasNext()) {
String key = iterator.next();
row.setField(pos, jsonNode.get(key).asText());
pos++;
}
return row;
}
}).returns(convert);

// convert DataStream to Table

Table tableA = tEnv.fromDataStream(dataStreamRow);

maybe this workflow can give you some idee  ^^

Le jeu. 5 sept. 2019 à 11:14, Frank Wilson <[hidden email]> a écrit :
Hi,

So far I’ve been developing my flink pipelines using the datastream API. I have a pipeline that calculates windowed statistics on a given pojo field. Ideally I would like this field to be user configurable via a config file. To do this I would need to extract pojo fields by name. The Table API seems to have the facilities to do this. I gather datastreams can be converted to Tables. Is it sensible to convert to tables in the middle of my datastream pipeline to do this or is there a better way?

Thanks,

Frank