@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "1");
SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);
tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select id, key from table1");
RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<CRow> tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));
tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a retracting table sink
TableSchema.Builder schemaBuilder = TableSchema.builder();
schemaBuilder.field("key", DataTypes.STRING());
schemaBuilder.field("id", DataTypes.STRING());
TableSchema schema = schemaBuilder.build();
RetractSink retractTableSink = new RetractSink(new CollectingTableSink(schema));
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the output to the sink
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
}
private static class RetractSink implements RetractStreamTableSink<Row> {
private final AppendStreamTableSink<Row> delegate;
RetractSink(AppendStreamTableSink<Row> delegate) {
this.delegate = delegate;
}
@Override
public TypeInformation<Row> getRecordType() {
return delegate.getOutputType();
}
@Override
public TableSchema getTableSchema() {
return delegate.getTableSchema();
}
@Override
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
return new TupleTypeInfo<>(Types.BOOLEAN(), getRecordType());
}
@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
consumeDataStream(dataStream);
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<Row> filteredAndMapped =
dataStream.flatMap(new TupleMapper()).returns(getRecordType());
return delegate.consumeDataStream(filteredAndMapped);
}
@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
throw new UnsupportedOperationException();
}
}
private static final class TupleMapper implements FlatMapFunction<Tuple2<Boolean, Row>, Row> {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Row> out) {
if (value.f0) {
out.collect(value.f1);
}
}
}