class Example { private static final String AWS_ES_INDEX_NAME = "milestone-aggregation-index"; private static final String AWS_ES_DOCUMENT_TYPE = "milestone-aggregation-record"; private static final String TIMER_NAME = "timerName"; private static final String TIMER_START = "longTimer.start"; private static final String TIMER_END = "longTimer.end"; private static final String TIMER_PAUSE = "longTimer.pause"; private static final String TIMER_RESUME = "longTimer.resume"; private static final String INPUT_METRIC_NAME = "TimeWaitingForActivity"; private static final String OUTPUT_METRIC_NAME = "TimeWaitingForActivity"; @Inject @Named(EnvironmentModule.FLINK_CONFIG_KEY) private static Map flinkConfig; public static void main(String[] args) throws Exception { Injector injector = Guice.createInjector(new EnvironmentModule(args), new KivaPickWorkflowProcessorModule()); // Retrieve execution environment based on running cluster StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(Integer.valueOf(flinkConfig.get(EnvironmentModule.CHECKPOINT_INTERVAL_MILLIS_KEY))); env.setStateBackend(injector.getInstance(FsStateBackend.class)); // Get data resource from KinesisConnectorWrapper KinesisConnectorWrapper kinesisConnector = injector.getInstance(KinesisConnectorWrapper.class); kinesisConnector.addSourceToEnv(env); DataStream source = kinesisConnector.getSource(); // Flatten each milestone in the batch to MilestoneEvent model DataStream metricEvent = source.flatMap(injector.getInstance(MilestoneBatchFlatMap.class)) .filter(evt -> evt.getData() != null && !evt.getData().isEmpty() && evt.getData().get(TIMER_NAME).equals(INPUT_METRIC_NAME)); // Set water marker for stream data DataStream eventInOrder = metricEvent .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time .seconds(Integer.valueOf(flinkConfig.get(EnvironmentModule.STREAM_OUT_OF_ORDER_TIME_KEY)))) { private static final long serialVersionUID = 5795391437252719084L; @Override public long extractTimestamp(MilestoneEvent element) { return element.getTimeStamp().getTime(); } }); DataStream partitionedEvent = eventInOrder.keyBy(new KeySelector() { private static final long serialVersionUID = 699212529568331455L; @Override public String getKey(MilestoneEvent event) { return event.getPickerId() + event.getWarehouseId(); } }); // Define pattern Pattern kivaWaitForActivityPattern = Pattern.begin("start") .where(new SimpleCondition() { private static final long serialVersionUID = 5407961589198363269L; @Override public boolean filter(MilestoneEvent event) { return event.getMilestoneType().equals(TIMER_START); } }).next("pauseOrResume").oneOrMore().optional().consecutive() .where(new SimpleCondition() { private static final long serialVersionUID = -1947909131821791451L; @Override public boolean filter(MilestoneEvent event) throws Exception { return event.getMilestoneType().equals(TIMER_PAUSE) || event.getMilestoneType().equals(TIMER_RESUME); } }).next("end").where(new SimpleCondition() { private static final long serialVersionUID = 1562355907068840844L; @Override public boolean filter(MilestoneEvent event) { return event.getMilestoneType().equals(TIMER_END); } }).within(Time.seconds(Integer.valueOf(flinkConfig.get(EnvironmentModule.PATTERN_DETECT_TIMEOUT_KEY)))); PatternStream patternStream = CEP.pattern(partitionedEvent, kivaWaitForActivityPattern); DataStream waitForActivityTime = patternStream .select(new PatternSelectFunction() { private static final long serialVersionUID = 4832891120845616503L; @Override public MilestoneAggregation select(Map> pattern) throws Exception { MilestoneEvent timerStart = Iterables.getFirst(pattern.get("start"), null); List timerPauseOrResumeList = pattern.get("pauseOrResume"); MilestoneEvent timerEnd = Iterables.getFirst(pattern.get("end"), null); long timeAccumulater = 0L; // Accumulate timer start to first pause List startList = new ArrayList(); List endList = new ArrayList(); startList.add(timerStart); // deal with duplicated pause or resume int eventIndex = 0; while (timerPauseOrResumeList != null && eventIndex < timerPauseOrResumeList.size()) { MilestoneEvent currentEvent = timerPauseOrResumeList.get(eventIndex); String currentState = currentEvent.getMilestoneType(); int tmpIndex = eventIndex + 1; if (currentState.equals(TIMER_PAUSE)) { endList.add(currentEvent); while (tmpIndex < timerPauseOrResumeList.size() && timerPauseOrResumeList.get(tmpIndex) .getMilestoneType().equals(currentState)) { tmpIndex++; } } else if (currentState.equals(TIMER_RESUME)) { startList.add(currentEvent); while (tmpIndex < timerPauseOrResumeList.size() && timerPauseOrResumeList.get(tmpIndex) .getMilestoneType().equals(currentState)) { tmpIndex++; } } eventIndex = tmpIndex; } endList.add(timerEnd); // accumulate time eventIndex = 0; while (eventIndex < Math.min(startList.size(), endList.size())) { timeAccumulater += endList.get(eventIndex).getTimeStamp().getTime() - startList.get(eventIndex).getTimeStamp().getTime(); eventIndex++; } long timeDiffInMillis = TimeUnit.MILLISECONDS.toMillis(timeAccumulater); Date processTime = new Date(); Map data = new HashMap(); data.put("pickStart", timerStart.getDate()); data.put("pickEnd", timerEnd.getDate()); return new MilestoneAggregation(timerStart.getWarehouseId(), timerStart.getPickerId(), OUTPUT_METRIC_NAME, timeDiffInMillis, processTime, timerEnd.getDate(), data, UUID.randomUUID()); } }); // Add source to elasticsearch sink ElasticsearchJestSinkWrapper elasticsearchConnector = injector.getInstance(ElasticsearchJestSinkWrapper.class); elasticsearchConnector.addSourceToSink(waitForActivityTime, AWS_ES_INDEX_NAME, AWS_ES_DOCUMENT_TYPE); // source.print(); // partitionedEvent.print(); // waitForActivityTime.print(); env.execute(); } }