How to create Row with RowTypeInfo

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to create Row with RowTypeInfo

Soheil Pourbafrani
Hi

Creating a new DataSet of type Row, how can I the RowTypeInfo of the row?

For example when I create a new dataset like the following:

Row row = Row.of(1, new Timestamp(1), new Date(1));
System.out.println(env.fromElements(row).getType());
it results in:
Row(f0: Integer, f1: Timestamp, f2: Date)
While Flink automatically use f0, f1, ... for labeling I want to specify the label name of each element

For example, when I create a new DataSet from a Table, it contains the Table schema too:

DataSet res = tEnv.toDataSet(table, Row.class);
Row(id: Integer, time: Timestamp, name: String, age: Integer, grade: Double)

So is there any way to create a new DataSet<Row> with optional RowTypeInfo?
Reply | Threaded
Open this post in threaded view
|

Re: How to create Row with RowTypeInfo

Caizhi Weng
I forget to add the user mailing list in the response. I now add user mailing list to the response in case other users might want to solve this problem too...

Soheil Pourbafrani <[hidden email]> 于2019年7月15日周一 上午2:56写道:
Great!
I got it

Thanks

On Sun, Jul 14, 2019 at 8:26 PM Caizhi Weng <[hidden email]> wrote:
Hi Soheil,

From what I understand, as `Row` is not strongly typed, and Flink currently does not specially treat the `RowTypeInfo` when extracting return types of map functions, one can't simply provide a map function to `dataset.map()` and expect it to return a strongly typed `RowTypeInfo`.

But if your mapping function implements the `ResultTypeQueryable` interface, Flink can directly get the result type from its `getProducedType` method (see `getUnaryOperatorReturnType` in `TypeExtractor`). So you can write the following code to hint Flink about the return type of your mapping function.

@Test
public void myTest() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, config());

DataSet<Row> dataSet = env.fromElements(
Row.of(1, 1L, "a"),
Row.of(2, 2L, "b"),
Row.of(3, 3L, "c")
);
System.out.println(dataSet.getType());

DataSet<Row> ds = dataSet.map(row -> Row.of(row.getField(0), row.getField(1)));
System.out.println(ds.getType());

DataSet<Row> ds2 = dataSet.map(new MyMappingFunc(dataSet));
System.out.println(ds2.getType());
}

private class MyMappingFunc implements MapFunction<Row, Row>, ResultTypeQueryable<Row> {

private RowTypeInfo producedType;

public MyMappingFunc(DataSet<Row> dataSet) {
RowTypeInfo ti = (RowTypeInfo) dataSet.getType();
producedType = new RowTypeInfo(ti.getTypeAt(0), ti.getTypeAt(1));
}

@Override
public TypeInformation<Row> getProducedType() {
return producedType;
}

@Override
public Row map(Row value) throws Exception {
return Row.of(value.getField(0), value.getField(1));
}
}
You can see that, the type of `ds2` will be the desired Row(f0: Integer, f1: Long).

This is the method what I can currently think of. Maybe there exists a simpler method, I'll ask the other Flink developers tomorrow. Maybe the developers would like to specially treat the `RowTypeInfo` when deriving return types of the mapping functions in the future.

Soheil Pourbafrani <[hidden email]> 于2019年7月14日周日 下午8:22写道:
Hi Caizhi, 
thanks for your reply.

Maybe I didn't elaborate enough. I'm familiar with that method, maybe. My problem is supposed you have a dataset with the schema (id: Integer, name: String, register_date: Date). If I try to map it like the following:

DataSet<Row> res = dataset.map(row -> {
Row temp = Row.of(
row.getField(0),
row.getField(
1)
)
;

return temp;
});
then the res type info will be :
System.out.println(res.getType());
GenericType<org.apache.flink.types.Row>

But I want the typeinfo to show 
Row(id: Integer, name: String)

Is there any way to use the schema of the dataset in map operation to generate a new dataset?

On Sun, Jul 14, 2019 at 6:44 AM Caizhi Weng <[hidden email]> wrote:
Hi Soheil,

There is the `toDataSet(Table, TypeInformation<?>)` method in `BatchTableEnvironment` to which you can pass your optional RowTypeInfo. Note that the TypeInformation you pass to it must match the field types of the table.

Soheil Pourbafrani <[hidden email]> 于2019年7月14日周日 上午5:03写道:
Hi

Creating a new DataSet of type Row, how can I the RowTypeInfo of the row?

For example when I create a new dataset like the following:

Row row = Row.of(1, new Timestamp(1), new Date(1));
System.out.println(env.fromElements(row).getType());
it results in:
Row(f0: Integer, f1: Timestamp, f2: Date)
While Flink automatically use f0, f1, ... for labeling I want to specify the label name of each element

For example, when I create a new DataSet from a Table, it contains the Table schema too:

DataSet res = tEnv.toDataSet(table, Row.class);
Row(id: Integer, time: Timestamp, name: String, age: Integer, grade: Double)

So is there any way to create a new DataSet<Row> with optional RowTypeInfo?