@Log4j2 public final class AggregationWriterFlinkProcess { /** * Get the InputData. * * @return - the Input Data */ public static AggregationWriterInputData getInputData() { return INPUT_DATA; } private static final AggregationWriterInputData INPUT_DATA = new AggregationWriterInputData(); private AggregationWriterFlinkProcess() { } /** * The flink job main function. * * @param args - process input arguments: * tenantId - the tenant Id * correlationId - the log correlation Id * datasetId - the Data Set Id * s3FolderLocation - The Data Set s3 Folder location * s3OutputPath - full output path to upload Data Store files *
* For Example:
* -tenantId
* -correlationId
* -datasetId dataset-id-1
* -folderLocation
*/
public static void main(String[] args) throws Exception {
CommandLineParser parser = new DefaultParser();
try {
log.info("Starting Aggregation Batch Flink Job");
CommandLine commandLine = parser.parse(INPUT_DATA.getOptions(), args);
INPUT_DATA.validateArgs(commandLine);
ParameterTool parameters = ParameterTool.fromArgs(args);
ThreadContext.put("TenantId", INPUT_DATA.getTenantId());
ThreadContext.put("LogCorrelationId", INPUT_DATA.getCorrelationId());
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
log.info("Read data from: " + INPUT_DATA.getS3FolderLocation());
DataSet