@Log4j2 public class FlatMapXSightMsgProcessor extends RichFlatMapFunction { /** * Get DataSet Line and create XSightMessage. * @param line - a line from the data set * @param collector - The created XSightMessage */ @Override public void flatMap(String line, Collector collector) { final ObjectMapper mapper = new ObjectMapper(); try { XSightMessage message = mapper.readValue(line, XSightMessage.class); collector.collect(message); } catch (JsonProcessingException e) { log.error(e.getMessage()); throw new RuntimeException("The line is not in the expected format of XSightMessage class object: " + line); } }