package eu.euranova.leadcep; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.Test; class TypeA extends Tuple3 {}; class TypeB extends Tuple3 {}; public class Main { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStream A = env.fromElements( new TestType(1L, 1L, "a"), new TestType(2L, 2L, "b"), new TestType(3L, 3L, "d") ); DataStream B = env.fromElements( new TestType(1L, 1L, "a"), new TestType(2L, 2L, "c"), new TestType(3L, 3L, "d") ); A.print("A"); B.print("B"); KeySelector keySelector = new KeySelector() { @Override public Object getKey(TestType value) throws Exception { return value.getF2(); } }; A .connect(B) .keyBy(keySelector, keySelector) .flatMap(new AND()) //.returns(new TypeHint>() {}) .print("A and B"); /* DataStream> C = env.fromElements( Tuple3.of(1L, 3L, "d"), Tuple3.of(2L, 2L, "d"), Tuple3.of(3L, 2L, "d"), Tuple3.of(4L, 3L, "d"), Tuple3.of(5L, 2L, "d") ); C.print("C"); C .keyBy(2) .flatMap(new CountWindowAverage()) .print("Average(A)"); */ env.execute(); } }