@Log4j2 public final class AggregationWriterFlinkProcess { /** * Get the InputData. * * @return - the Input Data */ public static AggregationWriterInputData getInputData() { return INPUT_DATA; } private static final AggregationWriterInputData INPUT_DATA = new AggregationWriterInputData(); private AggregationWriterFlinkProcess() { } /** * The flink job main function. * * @param args - process input arguments: * tenantId - the tenant Id * correlationId - the log correlation Id * datasetId - the Data Set Id * s3FolderLocation - The Data Set s3 Folder location * s3OutputPath - full output path to upload Data Store files *

* For Example: * -tenantId * -correlationId * -datasetId dataset-id-1 * -folderLocation */ public static void main(String[] args) throws Exception { CommandLineParser parser = new DefaultParser(); try { log.info("Starting Aggregation Batch Flink Job"); CommandLine commandLine = parser.parse(INPUT_DATA.getOptions(), args); INPUT_DATA.validateArgs(commandLine); ParameterTool parameters = ParameterTool.fromArgs(args); ThreadContext.put("TenantId", INPUT_DATA.getTenantId()); ThreadContext.put("LogCorrelationId", INPUT_DATA.getCorrelationId()); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameters); log.info("Read data from: " + INPUT_DATA.getS3FolderLocation()); DataSet lines = env.readTextFile("s3://" + INPUT_DATA.getS3FolderLocation()); log.debug("Process data"); DataSet xsightMessages = lines.flatMap(new FlatMapXSightMsgProcessor()); xsightMessages.print(); env.execute(); } catch (IllegalArgumentException e) { log.error(String.format("The Batch Job received invalid arguments: %s", e.getMessage()), e); throw e; } catch (Exception e) { log.error("Unexpected exception: " + e.getMessage()); throw e; } finally { try { AwsSdkMetrics.unregisterMetricAdminMBean(); } catch (Exception e) { log.error("Could not unregister metric"); } ThreadContext.clearAll(); } } }