/* * Copyright (c) 2019 Percentile Limited. All rights reserved. */ package test; import java.util.Arrays; import java.util.List; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; public class FlinkTest { public static void main(String[] args) throws Exception { FlinkTest test = new FlinkTest(); test.run(); } private final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); private final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); public void run() throws Exception { streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); List fxRateData = Arrays.asList( new DoubleLookup(0L, "USD", 1.0d), new DoubleLookup(0L, "GBP", 1.31d), new DoubleLookup(0L, "EUR", 1.12d), new DoubleLookup(70L, "EUR", 1.11d) ); List priceData = Arrays.asList( new DoubleLookup(0L, "A1", 1.99d), new DoubleLookup(0L, "A2", 2.99d), new DoubleLookup(0L, "A3", 9.50d), new DoubleLookup(0L, "A4", 1.25d), new DoubleLookup(100L, "A1", 1.75d) ); List orderData = Arrays.asList( new Order(10L, 1, "A1", "USD", 1), new Order(25L, 2, "A3", "GBP", 2), new Order(40L, 3, "A2", "EUR", 1), new Order(50L, 4, "A1", "GBP", 10), new Order(75L, 5, "A4", "EUR", 10), new Order(105L, 6, "A1", "EUR", 10) ); Table fxRateTable = createTable("FxRates", "id AS currency, value AS rate", fxRateData); TemporalTableFunction fxRateLookup = fxRateTable.createTemporalTableFunction("rowtime", "currency"); tableEnv.registerFunction("FxRateLookup", fxRateLookup); Table priceTable = createTable("Prices", "id AS productId, value AS price", priceData); TemporalTableFunction priceLookup = priceTable.createTemporalTableFunction("rowtime", "productId"); tableEnv.registerFunction("PriceLookup", priceLookup); createTable("Orders", "id, productId, currency, quantity", orderData); // This works String fxOnlySql = "SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate" + " FROM Orders AS o," + " LATERAL TABLE (FxRateLookup(o.rowtime)) AS f" + " WHERE o.currency = f.currency"; Table ordersWithFx = tableEnv.sqlQuery(fxOnlySql); ordersWithFx.printSchema(); tableEnv.toAppendStream(ordersWithFx, Row.class).addSink(new PrintSinkFunction<>("FX Rate", false)); // This also works String priceOnlySql = "SELECT o.id AS orderId, o.productId, o.currency, o.quantity, p.price" + " FROM Orders AS o," + " LATERAL TABLE (PriceLookup(o.rowtime)) AS p" + " WHERE o.productId = p.productId"; Table ordersWithPrice = tableEnv.sqlQuery(priceOnlySql); ordersWithPrice.printSchema(); tableEnv.toAppendStream(ordersWithPrice, Row.class).addSink(new PrintSinkFunction<>("Price", false)); // This doesn't work - fails with NullPointerException String bothSql = "SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price" + " FROM Orders AS o," + " LATERAL TABLE (FxRateLookup(o.rowtime)) AS f," + " LATERAL TABLE (PriceLookup(o.rowtime)) AS p" + " WHERE o.currency = f.currency" + " AND o.productId = p.productId"; Table ordersWithBoth = tableEnv.sqlQuery(bothSql); ordersWithBoth.printSchema(); tableEnv.toAppendStream(ordersWithBoth, Row.class).addSink(new PrintSinkFunction<>("Full", false)); streamEnv.execute(); } private Table createTable(String tableName, String fieldNames, List data) { EventTimeExtractor timeExtractor = new EventTimeExtractor<>(Time.milliseconds(10)); DataStream fxRateStream = streamEnv.fromCollection(data).assignTimestampsAndWatermarks(timeExtractor); Table table = tableEnv.fromDataStream(fxRateStream, fieldNames + ", rowtime.rowtime"); tableEnv.registerTable(tableName, table); table.printSchema(); return table; } public static class EventTimeExtractor extends BoundedOutOfOrdernessTimestampExtractor { public EventTimeExtractor(Time maxOutOfOrderness) { super(maxOutOfOrderness); } @Override public long extractTimestamp(T element) { return element.eventTime; } } public static class TimedEvent { public long eventTime; public TimedEvent() { } public TimedEvent(long eventTime) { this.eventTime = eventTime; } } public static class DoubleLookup extends TimedEvent { public String id; public double value; public DoubleLookup() { } public DoubleLookup(long eventTime, String id, double value) { super(eventTime); this.id = id; this.value = value; } @Override public String toString() { return "DoubleLookup{" + "eventTime=" + eventTime + ", id='" + id + '\'' + ", value='" + value + '\'' + '}'; } } public static class Order extends TimedEvent { public int id; public String productId; public String currency; public int quantity; public Order() { } public Order(long eventTime, int id, String productId, String currency, int quantity) { super(eventTime); this.id = id; this.productId = productId; this.currency = currency; this.quantity = quantity; } } }