Hi all, I'm new to Flink so am probably missing something simple. I'm using Flink 1.7.1 and am trying to use temporal table functions but aren't getting the results I expect. With the example code below, I would expect 4 records to be output (one for each order), but instead I'm only seeing a (random) subset of these records (it varies on each run). To compound my confusion further, the CSV output often shows a different subset of results than those written to the console. I assume there's a race condition of some sort but I can't figure out where it is. Any ideas what I'm doing wrong? import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.types.Row; public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); List<Tuple2<String, Double>> rateData = Arrays.asList( new Tuple2<>("GBP", 1.29), new Tuple2<>("EUR", 1.14), new Tuple2<>("EUR", 1.15), new Tuple2<>("GBP", 1.30)); DataStreamSource<Tuple2<String, Double>> rateStream = env.addSource(new DelayedSource<>(rateData, 1L)); rateStream.returns(new TypeHint<Tuple2<String, Double>>() {}); Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, Rate, FxRates_ProcTime.proctime"); TemporalTableFunction rates = rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency"); tableEnv.registerFunction("FxRates", rates); List<Tuple3<Integer, String, Double>> orderData = Arrays.asList( new Tuple3<>(1, "GBP", 4.51), new Tuple3<>(2, "GBP", 23.68), new Tuple3<>(3, "EUR", 2.99), new Tuple3<>(4, "EUR", 14.76)); DataStreamSource<Tuple3<Integer, String, Double>> orderStream = env.addSource(new DelayedSource<>(orderData, 100L)); orderStream.returns(new TypeHint<Tuple3<Integer, String, Double>>() {}); Table orders = tableEnv.fromDataStream(orderStream, "OrderId, o_Currency, Amount, Order_ProcTime.proctime"); Table usdOrders = orders.join(new Table(tableEnv, "FxRates(Order_ProcTime)"), "o_Currency = Currency") .select("OrderId, Amount, Currency, Rate, (Amount * Rate) as UsdAmount"); String[] fields = usdOrders.getSchema().getFieldNames(); TypeInformation<?>[] types = usdOrders.getSchema().getFieldTypes(); DataStream<Row> usdStream = tableEnv.toAppendStream(usdOrders, usdOrders.getSchema().toRowType()); CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",", 1, FileSystem.WriteMode.OVERWRITE); tableEnv.registerTableSink("csvSink", fields, types, csvTableSink); usdOrders.insertInto("csvSink"); usdStream.addSink(new PrintSink()); env.execute(); System.out.println("Test completed at " + time()); } public static String time() { return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME); } private static class DelayedSource<T> extends RichSourceFunction<T> { private final List<T> data; private final long initialDelay; private volatile boolean shutdown; private DelayedSource(List<T> data, long initialDelay) { this.data = data; this.initialDelay = initialDelay; } @Override public void run(SourceContext<T> ctx) throws Exception { Iterator<T> iterator = data.iterator(); Thread.sleep(initialDelay); while (!shutdown && iterator.hasNext()) { T next = iterator.next(); System.out.println(time() + " - producing " + next); ctx.collect(next); } } @Override public void cancel() { shutdown = true; } } private static class PrintSink extends RichSinkFunction<Row> { @Override public void invoke(Row value, Context context) { Integer orderId = (Integer) value.getField(0); Double amount = (Double) value.getField(1); String currency = (String) value.getField(2); Double rate = (Double) value.getField(3); Double usdAmount = (Double) value.getField(4); System.out.println(time() + " - order " + orderId + " was for " + usdAmount + " USD (" + amount + ' ' + currency + " @ " + rate + ')'); } } } |
Hi, The problem is that you are using processing time which is non-deterministic. Both inputs are consumed at the same time and joined based on which record arrived first. The result depends on a race condition. If you change the input table to have event time attributes and use these to register the time-versioned table, the results should become stable. Best, Fabian
Hi Fabian, I was investigating this further today and was just coming to the same conclusion! Thanks for the confirmation, I'll make the suggested changes and see where that gets me. Originally I had assumed processing time for the rates table would be set when the rates are first produced by DelayedSource, but I guess that concept is closer to ingestion time than processing time. My takeaway from this is that it's never a good idea to be comparing two different processing time fields since it's not clear which of the two gets assigned first. Regards, Chris ------ Original Message ------
