import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
// Using tuples, this works as expected
List<Tuple2<String, Double>> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource<Tuple2<String, Double>> tupleStream = streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();
// Using a DataStream of POJOs, how do I obtain an equivalent table to the above?
List<FxRate> pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData);
Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();
// This fails with "ValidationException: Cannot resolve field [currency], input field list:[fx]"
pojoTable.select("currency, rate").printSchema();
// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();
// This fails with "ValidationException: Too many fields referenced from an atomic type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();
// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();
streamEnv.execute();
}
public static class FxRate {
public String currency;
public double rate;
public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}
@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
}
}