package org.zemoso.flink.helloworld; import java.util.Map; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; public class SepsisDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); PatientEvent event1 = new PatientEvent(1); event1.setTemperature(39); PatientEvent event2 = new PatientEvent(1); event2.setHeartRate(91); PatientEvent event3 = new PatientEvent(2); event3.setTemperature(37); PatientEvent event4 = new PatientEvent(2); event4.setHeartRate(91); PatientEvent event5 = new PatientEvent(3); event3.setTemperature(37); PatientEvent event6 = new PatientEvent(3); event4.setTemperature(35); DataStream inputEventStream = env.fromElements( event1, event2, event3, event4, event5, event6 ); Pattern sirsCriteriaPattern = Pattern.begin("sirsCriteria1") .where(new SirsFilterFunction()) .followedBy("sirsCriteria2") .where(new SirsFilterFunction()) .within(Time.hours(24)); PatternStream sirsPatternStream = CEP.pattern( inputEventStream.keyBy("patientId"), sirsCriteriaPattern); DataStream sirsPatientStream = sirsPatternStream.select( (Map pattern) -> { PatientEvent sirsCriteria = (PatientEvent) pattern.get("sirsCriteria1"); return String.valueOf(sirsCriteria.getPatientId()); } ); sirsPatientStream.writeAsText("file:///C:/Zemoso/tmp/sirs", WriteMode.OVERWRITE); env.execute("SEPSIS"); } public static class SirsFilterFunction implements FilterFunction{ @Override public boolean filter(PatientEvent event) throws Exception { if(event.getTemperature() > 38 || event.getTemperature() < 36 || event.getHeartRate() > 90 || event.getRespiratoryRate() > 20 ) return true; return false; } } }