public class Main {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("group.id", "Flink");
properties.put("bootstrap.servers", "broker:9092");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"test",
new SimpleStringSchema(),
properties);
DataStream<String> stream = env
.addSource(consumer)
.map((MapFunction<String, String>) s -> {
// Just getting an object model
return model.toString();
}).assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> {
Model model = new Gson().fromJson(event, Model.class);
return model.getServerTime();
}));
stream.print("Stream");
Pattern<String, String> firstPattern = Pattern
.<String>begin("first")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return s.contains("Start");
}
});
DataStream<String> result = CEP
.pattern(stream, firstPattern)
.inEventTime() // default TimeCharacteristic for 1.12
.process(new PatternProcessFunction<String, String>() {
@Override
public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("first").get(0));
}
});
result.print("Result");
env.execute();
}
}
Please, help me to correct the code )
Thanks,Yuri L.