Hi,
Able to read *.gz files from an s3 folder. I want to
get the 1st gz file from the s3 folder and then sort only the 1st gz file into an Ordered Map
as below and get the orderedMap.
getFirstKey() as a 1st event timestamp. I want to then
pass this 1st event timestamp to all TaskManagers along with a single current time as an epoch Time.
final DataStreamSource<String> stringDataStreamSource = env.readTextFile(s3Folder);
final SingleOutputStreamOperator<Map<String, Map<String, Object>>> orderedMapOutput = stringDataStreamSource.map(new MapFunction<String, Map<String, Map<String, Object>>>() {
@Override
public Map<String, Map<String, Object>> map(String jsonStr) throws Exception {
logger.info("record written:{}", jsonStr); //this shows the proper json string from within the gz file properly
Map<String, Object> resultMap = fromJson(jsonStr);//deserialize json
//sort by event_timestamp
Map<String, Map<String, Object>> orderedMap = new TreeMap<>();
if (resultMap != null) {
Object eventTsObj = resultMap.get(EVENT_TIMESTAMP);
if (eventTsObj != null) {
String eventTS = (String) eventTsObj;
orderedMap.put(eventTS, resultMap);
}
} else {
logger.warn("Could not deserialize:{}", jsonStr);
}
return orderedMap;
}
});
TIA,