StreamExecutionEnvironment streamExecutionEnvironment = ...; DataStream carStream = streamExecutionEnvironment.addSource(...); DataStream brandStream = streamExecutionEnvironment.addSource(...); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment, ...); streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new LastValueAggFunction.StringLastValueAggFunction()); streamTableEnvironment.registerFunction("LAST_VALUE_LONG", new LastValueAggFunction.LongLastValueAggFunction()); streamTableEnvironment.registerFunction("LAST_VALUE_BOOLEAN", new LastValueAggFunction.BooleanLastValueAggFunction()); Table carTable = streamTableEnvironment.fromDataStream(carStream) .groupBy($("carId")) .select( $("carId").as("c_carId"), call("LAST_VALUE_LONG", $("brandId")).as("c_brandId"), call("LAST_VALUE_LONG", $("serialNumber")).as("c_serialNumber"), call("LAST_VALUE_STRING", $("carName")).as("c_carName") ); Table brandTable = streamTableEnvironment.fromDataStream(brandStream) .groupBy($("brandId")) .select( $("brandId").as("b_brandId"), call("LAST_VALUE_STRING", $("brandName")).as("b_brandName") ); Table brandCarTable = carTable.join(brandTable) .where($("c_brandId").isEqual($("b_brandId"))) .groupBy($("c_carId")) .select( $("c_carId").as("carId"), call("LAST_VALUE_LONG", $("b_brandId")).as("brandId"), call("LAST_VALUE_LONG", $("c_serialNumber")).as("serialNumber"), call("LAST_VALUE_STRING", $("c_carName")).as("carName"), call("LAST_VALUE_STRING", $("b_brandName")).as("brandName") ); DataStream brandCarStream = streamTableEnvironment.toRetractStream(brandCarTable, BrandCar.class) .filter(flaggedJoin -> flaggedJoin.f0) .map(changedJoin -> changedJoin.f1) .flatMap(...); brandCarStream.addSink(...) .name(...); streamExecutionEnvironment.execute("Brand Car Join");