Temporal tables not behaving as expected

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Temporal tables not behaving as expected

Chris Miller-2
Hi all,

I'm new to Flink so am probably missing something simple. I'm using Flink 1.7.1 and am trying to use temporal table functions but aren't getting the results I expect. With the example code below, I would expect 4 records to be output (one for each order), but instead I'm only seeing a (random) subset of these records (it varies on each run). To compound my confusion further, the CSV output often shows a different subset of results than those written to the console. I assume there's a race condition of some sort but I can't figure out where it is. Any ideas what I'm doing wrong?


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    List<Tuple2<String, Double>> rateData = Arrays.asList(
        new Tuple2<>("GBP", 1.29),
        new Tuple2<>("EUR", 1.14),
        new Tuple2<>("EUR", 1.15),
        new Tuple2<>("GBP", 1.30));
    DataStreamSource<Tuple2<String, Double>> rateStream = env.addSource(new DelayedSource<>(rateData, 1L));
    rateStream.returns(new TypeHint<Tuple2<String, Double>>() {});

    Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, Rate, FxRates_ProcTime.proctime");
    TemporalTableFunction rates = rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency");
    tableEnv.registerFunction("FxRates", rates);

    List<Tuple3<Integer, String, Double>> orderData = Arrays.asList(
        new Tuple3<>(1, "GBP", 4.51),
        new Tuple3<>(2, "GBP", 23.68),
        new Tuple3<>(3, "EUR", 2.99),
        new Tuple3<>(4, "EUR", 14.76));

    DataStreamSource<Tuple3<Integer, String, Double>> orderStream = env.addSource(new DelayedSource<>(orderData, 100L));
    orderStream.returns(new TypeHint<Tuple3<Integer, String, Double>>() {});

    Table orders = tableEnv.fromDataStream(orderStream, "OrderId, o_Currency, Amount, Order_ProcTime.proctime");
    Table usdOrders = orders.join(new Table(tableEnv, "FxRates(Order_ProcTime)"), "o_Currency = Currency")
                            .select("OrderId, Amount, Currency, Rate, (Amount * Rate) as UsdAmount");

    String[] fields = usdOrders.getSchema().getFieldNames();
    TypeInformation<?>[] types = usdOrders.getSchema().getFieldTypes();
    DataStream<Row> usdStream = tableEnv.toAppendStream(usdOrders, usdOrders.getSchema().toRowType());
    CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);
    tableEnv.registerTableSink("csvSink", fields, types, csvTableSink);
    usdOrders.insertInto("csvSink");
    usdStream.addSink(new PrintSink());
    env.execute();
    System.out.println("Test completed at " + time());
  }

  public static String time() {
    return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
  }

  private static class DelayedSource<T> extends RichSourceFunction<T> {
    private final List<T> data;
    private final long initialDelay;
    private volatile boolean shutdown;

    private DelayedSource(List<T> data, long initialDelay) {
      this.data = data;
      this.initialDelay = initialDelay;
    }

    @Override
    public void run(SourceContext<T> ctx) throws Exception {
      Iterator<T> iterator = data.iterator();
      Thread.sleep(initialDelay);
      while (!shutdown && iterator.hasNext()) {
        T next = iterator.next();
        System.out.println(time() + " - producing " + next);
        ctx.collect(next);
      }
    }

    @Override
    public void cancel() {
      shutdown = true;
    }
  }

  private static class PrintSink extends RichSinkFunction<Row> {
    @Override
    public void invoke(Row value, Context context) {
      Integer orderId = (Integer) value.getField(0);
      Double amount = (Double) value.getField(1);
      String currency = (String) value.getField(2);
      Double rate = (Double) value.getField(3);
      Double usdAmount = (Double) value.getField(4);
      System.out.println(time() + " - order " + orderId + " was for " + usdAmount + " USD (" + amount + ' ' + currency + " @ " + rate + ')');
    }
  }
}
Reply | Threaded
Open this post in threaded view
|

Re: Temporal tables not behaving as expected

Fabian Hueske-2
Hi,

The problem is that you are using processing time which is non-deterministic.
Both inputs are consumed at the same time and joined based on which record arrived first. The result depends on a race condition.

If you change the input table to have event time attributes and use these to register the time-versioned table, the results should become stable.

Best, Fabian

Am Mo., 21. Jan. 2019 um 15:45 Uhr schrieb Chris Miller <[hidden email]>:
Hi all,

I'm new to Flink so am probably missing something simple. I'm using Flink 1.7.1 and am trying to use temporal table functions but aren't getting the results I expect. With the example code below, I would expect 4 records to be output (one for each order), but instead I'm only seeing a (random) subset of these records (it varies on each run). To compound my confusion further, the CSV output often shows a different subset of results than those written to the console. I assume there's a race condition of some sort but I can't figure out where it is. Any ideas what I'm doing wrong?


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    List<Tuple2<String, Double>> rateData = Arrays.asList(
        new Tuple2<>("GBP", 1.29),
        new Tuple2<>("EUR", 1.14),
        new Tuple2<>("EUR", 1.15),
        new Tuple2<>("GBP", 1.30));
    DataStreamSource<Tuple2<String, Double>> rateStream = env.addSource(new DelayedSource<>(rateData, 1L));
    rateStream.returns(new TypeHint<Tuple2<String, Double>>() {});

    Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, Rate, FxRates_ProcTime.proctime");
    TemporalTableFunction rates = rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency");
    tableEnv.registerFunction("FxRates", rates);

    List<Tuple3<Integer, String, Double>> orderData = Arrays.asList(
        new Tuple3<>(1, "GBP", 4.51),
        new Tuple3<>(2, "GBP", 23.68),
        new Tuple3<>(3, "EUR", 2.99),
        new Tuple3<>(4, "EUR", 14.76));

    DataStreamSource<Tuple3<Integer, String, Double>> orderStream = env.addSource(new DelayedSource<>(orderData, 100L));
    orderStream.returns(new TypeHint<Tuple3<Integer, String, Double>>() {});

    Table orders = tableEnv.fromDataStream(orderStream, "OrderId, o_Currency, Amount, Order_ProcTime.proctime");
    Table usdOrders = orders.join(new Table(tableEnv, "FxRates(Order_ProcTime)"), "o_Currency = Currency")
                            .select("OrderId, Amount, Currency, Rate, (Amount * Rate) as UsdAmount");

    String[] fields = usdOrders.getSchema().getFieldNames();
    TypeInformation<?>[] types = usdOrders.getSchema().getFieldTypes();
    DataStream<Row> usdStream = tableEnv.toAppendStream(usdOrders, usdOrders.getSchema().toRowType());
    CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);
    tableEnv.registerTableSink("csvSink", fields, types, csvTableSink);
    usdOrders.insertInto("csvSink");
    usdStream.addSink(new PrintSink());
    env.execute();
    System.out.println("Test completed at " + time());
  }

  public static String time() {
    return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
  }

  private static class DelayedSource<T> extends RichSourceFunction<T> {
    private final List<T> data;
    private final long initialDelay;
    private volatile boolean shutdown;

    private DelayedSource(List<T> data, long initialDelay) {
      this.data = data;
      this.initialDelay = initialDelay;
    }

    @Override
    public void run(SourceContext<T> ctx) throws Exception {
      Iterator<T> iterator = data.iterator();
      Thread.sleep(initialDelay);
      while (!shutdown && iterator.hasNext()) {
        T next = iterator.next();
        System.out.println(time() + " - producing " + next);
        ctx.collect(next);
      }
    }

    @Override
    public void cancel() {
      shutdown = true;
    }
  }

  private static class PrintSink extends RichSinkFunction<Row> {
    @Override
    public void invoke(Row value, Context context) {
      Integer orderId = (Integer) value.getField(0);
      Double amount = (Double) value.getField(1);
      String currency = (String) value.getField(2);
      Double rate = (Double) value.getField(3);
      Double usdAmount = (Double) value.getField(4);
      System.out.println(time() + " - order " + orderId + " was for " + usdAmount + " USD (" + amount + ' ' + currency + " @ " + rate + ')');
    }
  }
}
Reply | Threaded
Open this post in threaded view
|

Re: Temporal tables not behaving as expected

Chris Miller-2
Hi Fabian,

I was investigating this further today and was just coming to the same conclusion! Thanks for the confirmation, I'll make the suggested changes and see where that gets me. Originally I had assumed processing time for the rates table would be set when the rates are first produced by DelayedSource, but I guess that concept is closer to ingestion time than processing time. My takeaway from this is that it's never a good idea to be comparing two different processing time fields since it's not clear which of the two gets assigned first.

Regards,
Chris

------ Original Message ------
From: "Fabian Hueske" <[hidden email]>
To: "Chris Miller" <[hidden email]>
Cc: "user" <[hidden email]>
Sent: 22/01/2019 11:23:23
Subject: Re: Temporal tables not behaving as expected

Hi,

The problem is that you are using processing time which is non-deterministic.
Both inputs are consumed at the same time and joined based on which record arrived first. The result depends on a race condition.

If you change the input table to have event time attributes and use these to register the time-versioned table, the results should become stable.

Best, Fabian

Am Mo., 21. Jan. 2019 um 15:45 Uhr schrieb Chris Miller <[hidden email]>:
Hi all,

I'm new to Flink so am probably missing something simple. I'm using Flink 1.7.1 and am trying to use temporal table functions but aren't getting the results I expect. With the example code below, I would expect 4 records to be output (one for each order), but instead I'm only seeing a (random) subset of these records (it varies on each run). To compound my confusion further, the CSV output often shows a different subset of results than those written to the console. I assume there's a race condition of some sort but I can't figure out where it is. Any ideas what I'm doing wrong?


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    List<Tuple2<String, Double>> rateData = Arrays.asList(
        new Tuple2<>("GBP", 1.29),
        new Tuple2<>("EUR", 1.14),
        new Tuple2<>("EUR", 1.15),
        new Tuple2<>("GBP", 1.30));
    DataStreamSource<Tuple2<String, Double>> rateStream = env.addSource(new DelayedSource<>(rateData, 1L));
    rateStream.returns(new TypeHint<Tuple2<String, Double>>() {});

    Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, Rate, FxRates_ProcTime.proctime");
    TemporalTableFunction rates = rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency");
    tableEnv.registerFunction("FxRates", rates);

    List<Tuple3<Integer, String, Double>> orderData = Arrays.asList(
        new Tuple3<>(1, "GBP", 4.51),
        new Tuple3<>(2, "GBP", 23.68),
        new Tuple3<>(3, "EUR", 2.99),
        new Tuple3<>(4, "EUR", 14.76));

    DataStreamSource<Tuple3<Integer, String, Double>> orderStream = env.addSource(new DelayedSource<>(orderData, 100L));
    orderStream.returns(new TypeHint<Tuple3<Integer, String, Double>>() {});

    Table orders = tableEnv.fromDataStream(orderStream, "OrderId, o_Currency, Amount, Order_ProcTime.proctime");
    Table usdOrders = orders.join(new Table(tableEnv, "FxRates(Order_ProcTime)"), "o_Currency = Currency")
                            .select("OrderId, Amount, Currency, Rate, (Amount * Rate) as UsdAmount");

    String[] fields = usdOrders.getSchema().getFieldNames();
    TypeInformation<?>[] types = usdOrders.getSchema().getFieldTypes();
    DataStream<Row> usdStream = tableEnv.toAppendStream(usdOrders, usdOrders.getSchema().toRowType());
    CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);
    tableEnv.registerTableSink("csvSink", fields, types, csvTableSink);
    usdOrders.insertInto("csvSink");
    usdStream.addSink(new PrintSink());
    env.execute();
    System.out.println("Test completed at " + time());
  }

  public static String time() {
    return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
  }

  private static class DelayedSource<T> extends RichSourceFunction<T> {
    private final List<T> data;
    private final long initialDelay;
    private volatile boolean shutdown;

    private DelayedSource(List<T> data, long initialDelay) {
      this.data = data;
      this.initialDelay = initialDelay;
    }

    @Override
    public void run(SourceContext<T> ctx) throws Exception {
      Iterator<T> iterator = data.iterator();
      Thread.sleep(initialDelay);
      while (!shutdown && iterator.hasNext()) {
        T next = iterator.next();
        System.out.println(time() + " - producing " + next);
        ctx.collect(next);
      }
    }

    @Override
    public void cancel() {
      shutdown = true;
    }
  }

  private static class PrintSink extends RichSinkFunction<Row> {
    @Override
    public void invoke(Row value, Context context) {
      Integer orderId = (Integer) value.getField(0);
      Double amount = (Double) value.getField(1);
      String currency = (String) value.getField(2);
      Double rate = (Double) value.getField(3);
      Double usdAmount = (Double) value.getField(4);
      System.out.println(time() + " - order " + orderId + " was for " + usdAmount + " USD (" + amount + ' ' + currency + " @ " + rate + ')');
    }
  }
}