StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(true); Map sinkConfig = getSinkConfig(); // Form msgIN & msgOUT Stream DataStream stream1 = // consume from kafka topic 1 KeyedStream stream1KeyedStream = stream1.keyBy("id"); DataStream stream2 = // consume from kafka topic 2 KeyedStream stream2KeyedStream = stream2.keyBy("id"); // Write Raw to S3 //writeRawMessageToS3(stream1, stream2, sinkConfig); stream1.writeAsText("msgInRaw.txt", FileSystem.WriteMode.OVERWRITE); stream2.writeAsText("msgOutRaw.txt", FileSystem.WriteMode.OVERWRITE); // Union the two streams DataStream unionStream = stream1KeyedStream.union(stream2KeyedStream); // Assign Timestamp and watermarks on the unionStream SingleOutputStreamOperator timestampsAndWatermarksStream = unionStream.assignTimestampsAndWatermarks(new MessageTimestamp(Time.milliseconds(1500))); SingleOutputStreamOperator> matchingAndNonMatchingMsgStream = timestampsAndWatermarksStream .keyBy("some_key") .timeWindow(Time.seconds(1)) .apply(new OuterJoinOperator()); SingleOutputStreamOperator> someDataStream = matchingAndNonMatchingMsgStream.flatMap(new SomeMessageExtractor()); SplitStream> split = someDataStream .split(new SomeErrorSplitter()); DataStream> errorStream = split.select("ErrorTuples"); DataStream> nonErrorStream = split .select("NonErrorTuples"); // S3 in Error Data Folder //writeErrorMessageToS3(errorStream, sinkConfig); errorStream.writeAsText("errorStream.txt", FileSystem.WriteMode.OVERWRITE); // Non Fix Error Messages - Create generic Record --> Data Enrichment // --> S3 in Processed Data Folder SingleOutputStreamOperator genericRecordStream = nonErrorStream .flatMap(new GenericRecordExtractor()); SplitStream valueErrorSplit = genericRecordStream.split(new ValueErrorSplitter()); DataStream errorRecords = valueErrorSplit.select("ErrorRecords"); DataStream nonErrorRecords = valueErrorSplit.select("NonErrorRecords"); //errorRecords.addSink(createRollingSinkForErrorData(sinkConfig)); errorRecords.writeAsText("errorRecords.txt", FileSystem.WriteMode.OVERWRITE); DataStream processedRecords = nonErrorRecords.map(new ReferenceDataExtractor()); //writeOrderEntryMessageToS3(processedRecords, sinkConfig); processedRecords.writeAsText("processedRecords.txt", FileSystem.WriteMode.OVERWRITE); // Filter Orphan Message Stream DataStream nonMatchingRecords = processedRecords.filter(new NonMatchingFilter()); //nonMatchingRecords.addSink(createRollingSinkForOrphanData(sinkConfig)); nonMatchingRecords.writeAsText("nonMatchingRecords", FileSystem.WriteMode.OVERWRITE); env.execute();