Blink Planner Retracting Streams

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Blink Planner Retracting Streams

John Mathews
Hello,

I am working on migrating from the flink table-planner to the new blink one, and one problem I am running into is that it doesn't seem like Blink has a concept of a CRow, unlike the original table-planner.

I am therefore struggling to figure out how to properly convert a retracting stream to a SingleOutputStreamOperator when using just the Blink planner libraries.

E.g. in the old planner I could do something like this: 
SingleOutputStreamOperator<CRow> stream = tableEnvironment.toRetractStream(table, typeInfo)
                    .map(value -> new CRow(value.f1, value.f0);

but without the CRow, I'm not sure how to accomplish this.

Any suggestions?

Thanks!
John


Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner Retracting Streams

godfrey he
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the StreamTableEnvironment#toRetractStream method return DataStream[(Boolean, T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
      override def map(value: (Boolean, Row)): R = ...
    })

Bests,
Godfrey

John Mathews <[hidden email]> 于2020年6月17日周三 下午12:13写道:
Hello,

I am working on migrating from the flink table-planner to the new blink one, and one problem I am running into is that it doesn't seem like Blink has a concept of a CRow, unlike the original table-planner.

I am therefore struggling to figure out how to properly convert a retracting stream to a SingleOutputStreamOperator when using just the Blink planner libraries.

E.g. in the old planner I could do something like this: 
SingleOutputStreamOperator<CRow> stream = tableEnvironment.toRetractStream(table, typeInfo)
                    .map(value -> new CRow(value.f1, value.f0);

but without the CRow, I'm not sure how to accomplish this.

Any suggestions?

Thanks!
John


Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner Retracting Streams

John Mathews
Hello Godfrey, 

Thanks for the response! 

I think the problem with Tuple2, is that if my understanding is correct of how CRow worked, when CRow's getSchema() was returned it would return the underlying schema of the row it contained. Tuple2 doesn't do that, so it changes/breaks a lot of our downstream code that is relying on the TableSchema to return the underlying row's schema, and not a Tuple schema.

Any thoughts on that issue?


On Wed, Jun 17, 2020 at 12:16 AM godfrey he <[hidden email]> wrote:
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the StreamTableEnvironment#toRetractStream method return DataStream[(Boolean, T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
      override def map(value: (Boolean, Row)): R = ...
    })

Bests,
Godfrey

John Mathews <[hidden email]> 于2020年6月17日周三 下午12:13写道:
Hello,

I am working on migrating from the flink table-planner to the new blink one, and one problem I am running into is that it doesn't seem like Blink has a concept of a CRow, unlike the original table-planner.

I am therefore struggling to figure out how to properly convert a retracting stream to a SingleOutputStreamOperator when using just the Blink planner libraries.

E.g. in the old planner I could do something like this: 
SingleOutputStreamOperator<CRow> stream = tableEnvironment.toRetractStream(table, typeInfo)
                    .map(value -> new CRow(value.f1, value.f0);

but without the CRow, I'm not sure how to accomplish this.

Any suggestions?

Thanks!
John


Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner Retracting Streams

Jark Wu-3
Hi John,

Maybe I misunderstand something, but CRow doesn't have the `getSchema()` method. You can getSchema() on the Table, this also works if you convert the table into Tuple2<Boolean, Row>. 
Actually, there is no big difference between CRow and Tuple2<Boolean, Row>, they both wrap the change flag and the Row. 

Best,
Jark



On Thu, 18 Jun 2020 at 06:39, John Mathews <[hidden email]> wrote:
Hello Godfrey, 

Thanks for the response! 

I think the problem with Tuple2, is that if my understanding is correct of how CRow worked, when CRow's getSchema() was returned it would return the underlying schema of the row it contained. Tuple2 doesn't do that, so it changes/breaks a lot of our downstream code that is relying on the TableSchema to return the underlying row's schema, and not a Tuple schema.

Any thoughts on that issue?


On Wed, Jun 17, 2020 at 12:16 AM godfrey he <[hidden email]> wrote:
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the StreamTableEnvironment#toRetractStream method return DataStream[(Boolean, T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
      override def map(value: (Boolean, Row)): R = ...
    })

Bests,
Godfrey

John Mathews <[hidden email]> 于2020年6月17日周三 下午12:13写道:
Hello,

I am working on migrating from the flink table-planner to the new blink one, and one problem I am running into is that it doesn't seem like Blink has a concept of a CRow, unlike the original table-planner.

I am therefore struggling to figure out how to properly convert a retracting stream to a SingleOutputStreamOperator when using just the Blink planner libraries.

E.g. in the old planner I could do something like this: 
SingleOutputStreamOperator<CRow> stream = tableEnvironment.toRetractStream(table, typeInfo)
                    .map(value -> new CRow(value.f1, value.f0);

but without the CRow, I'm not sure how to accomplish this.

Any suggestions?

Thanks!
John


Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner Retracting Streams

John Mathews
So the difference between Tuple2<Boolean, Row> and CRow is that CRow has a special TypeInformation defined here: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala#L32 

that returns the TypeInfo of the underlying row, whereas the TypeInformation for Tuple2 will return type information that contains the boolean for the retraction + a nested type info for the row. So all downstream operations that rely on seeing just the row type info now break for us. 

On Wed, Jun 17, 2020 at 9:23 PM Jark Wu <[hidden email]> wrote:
Hi John,

Maybe I misunderstand something, but CRow doesn't have the `getSchema()` method. You can getSchema() on the Table, this also works if you convert the table into Tuple2<Boolean, Row>. 
Actually, there is no big difference between CRow and Tuple2<Boolean, Row>, they both wrap the change flag and the Row. 

Best,
Jark



On Thu, 18 Jun 2020 at 06:39, John Mathews <[hidden email]> wrote:
Hello Godfrey, 

Thanks for the response! 

I think the problem with Tuple2, is that if my understanding is correct of how CRow worked, when CRow's getSchema() was returned it would return the underlying schema of the row it contained. Tuple2 doesn't do that, so it changes/breaks a lot of our downstream code that is relying on the TableSchema to return the underlying row's schema, and not a Tuple schema.

Any thoughts on that issue?


On Wed, Jun 17, 2020 at 12:16 AM godfrey he <[hidden email]> wrote:
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the StreamTableEnvironment#toRetractStream method return DataStream[(Boolean, T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
      override def map(value: (Boolean, Row)): R = ...
    })

Bests,
Godfrey

John Mathews <[hidden email]> 于2020年6月17日周三 下午12:13写道:
Hello,

I am working on migrating from the flink table-planner to the new blink one, and one problem I am running into is that it doesn't seem like Blink has a concept of a CRow, unlike the original table-planner.

I am therefore struggling to figure out how to properly convert a retracting stream to a SingleOutputStreamOperator when using just the Blink planner libraries.

E.g. in the old planner I could do something like this: 
SingleOutputStreamOperator<CRow> stream = tableEnvironment.toRetractStream(table, typeInfo)
                    .map(value -> new CRow(value.f1, value.f0);

but without the CRow, I'm not sure how to accomplish this.

Any suggestions?

Thanks!
John


Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner Retracting Streams

John Mathews
Below is a basic unit test of what we are trying to achieve, but basically, we are trying to convert from a retracting stream to a RetractingStreamTableSink, which is easily done with the CRow from the original flink planner, but seems to be very difficult to do with the blink planner.

The below fails with:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [key: STRING, id: STRING]

but will succeed if you uncomment the CRow lines of code and run with the original table planner.

Any thoughts on how we can accomplish this?

@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);

Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "1");

SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);

tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select id, key from table1");

RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);

// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<CRow> tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));

tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a retracting table sink
TableSchema.Builder schemaBuilder = TableSchema.builder();
schemaBuilder.field("key", DataTypes.STRING());
schemaBuilder.field("id", DataTypes.STRING());
TableSchema schema = schemaBuilder.build();
RetractSink retractTableSink = new RetractSink(new CollectingTableSink(schema));
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the output to the sink
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
}

private static class RetractSink implements RetractStreamTableSink<Row> {

private final AppendStreamTableSink<Row> delegate;

RetractSink(AppendStreamTableSink<Row> delegate) {
this.delegate = delegate;
}

@Override
public TypeInformation<Row> getRecordType() {
return delegate.getOutputType();
}

@Override
public TableSchema getTableSchema() {
return delegate.getTableSchema();
}

@Override
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
return new TupleTypeInfo<>(Types.BOOLEAN(), getRecordType());
}

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
consumeDataStream(dataStream);
}

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<Row> filteredAndMapped =
dataStream.flatMap(new TupleMapper()).returns(getRecordType());

return delegate.consumeDataStream(filteredAndMapped);
}

@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
throw new UnsupportedOperationException();
}
}

private static final class TupleMapper implements FlatMapFunction<Tuple2<Boolean, Row>, Row> {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Row> out) {
if (value.f0) {
out.collect(value.f1);
}
}
}

On Thu, Jun 18, 2020 at 10:21 AM John Mathews <[hidden email]> wrote:
So the difference between Tuple2<Boolean, Row> and CRow is that CRow has a special TypeInformation defined here: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala#L32 

that returns the TypeInfo of the underlying row, whereas the TypeInformation for Tuple2 will return type information that contains the boolean for the retraction + a nested type info for the row. So all downstream operations that rely on seeing just the row type info now break for us. 

On Wed, Jun 17, 2020 at 9:23 PM Jark Wu <[hidden email]> wrote:
Hi John,

Maybe I misunderstand something, but CRow doesn't have the `getSchema()` method. You can getSchema() on the Table, this also works if you convert the table into Tuple2<Boolean, Row>. 
Actually, there is no big difference between CRow and Tuple2<Boolean, Row>, they both wrap the change flag and the Row. 

Best,
Jark



On Thu, 18 Jun 2020 at 06:39, John Mathews <[hidden email]> wrote:
Hello Godfrey, 

Thanks for the response! 

I think the problem with Tuple2, is that if my understanding is correct of how CRow worked, when CRow's getSchema() was returned it would return the underlying schema of the row it contained. Tuple2 doesn't do that, so it changes/breaks a lot of our downstream code that is relying on the TableSchema to return the underlying row's schema, and not a Tuple schema.

Any thoughts on that issue?


On Wed, Jun 17, 2020 at 12:16 AM godfrey he <[hidden email]> wrote:
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the StreamTableEnvironment#toRetractStream method return DataStream[(Boolean, T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
      override def map(value: (Boolean, Row)): R = ...
    })

Bests,
Godfrey

John Mathews <[hidden email]> 于2020年6月17日周三 下午12:13写道:
Hello,

I am working on migrating from the flink table-planner to the new blink one, and one problem I am running into is that it doesn't seem like Blink has a concept of a CRow, unlike the original table-planner.

I am therefore struggling to figure out how to properly convert a retracting stream to a SingleOutputStreamOperator when using just the Blink planner libraries.

E.g. in the old planner I could do something like this: 
SingleOutputStreamOperator<CRow> stream = tableEnvironment.toRetractStream(table, typeInfo)
                    .map(value -> new CRow(value.f1, value.f0);

but without the CRow, I'm not sure how to accomplish this.

Any suggestions?

Thanks!
John