package eu.euranova.leadcep; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.util.Collector; public class AND extends RichCoFlatMapFunction> { private transient ListState leftState; private transient ListState rightState; @Override public void open(Configuration config) { ListStateDescriptor left_descriptor = new ListStateDescriptor<>( "and_left", TypeInformation.of(new TypeHint() { })); leftState = getRuntimeContext().getListState(left_descriptor); ListStateDescriptor right_descriptor = new ListStateDescriptor<>( "and_left", TypeInformation.of(new TypeHint() { })); rightState = getRuntimeContext().getListState(right_descriptor); } @Override public void flatMap1(LEFT leftInput, Collector> out) throws Exception { this.leftState.add(leftInput); Iterable rightList = this.rightState.get(); rightList.forEach(rightEvent -> out.collect(Tuple2.of(leftInput, rightEvent))); } @Override public void flatMap2(RIGHT rightInput, Collector> out) throws Exception { this.rightState.add(rightInput); Iterable leftList = this.leftState.get(); leftList.forEach(leftEvent -> out.collect(Tuple2.of(leftEvent, rightInput))); } }