|
I forget to add the user mailing list in the response. I now add user mailing list to the response in case other users might want to solve this problem too...
Hi Soheil,
From what I understand, as
`Row` is not strongly typed, and Flink currently does not specially
treat the `RowTypeInfo` when extracting return types of map functions,
one can't simply provide a map function to `dataset.map()` and expect it
to return a strongly typed `RowTypeInfo`.
But
if your mapping function implements the `ResultTypeQueryable` interface,
Flink can directly get the result type from its `getProducedType`
method (see `getUnaryOperatorReturnType` in `TypeExtractor`). So you can
write the following code to hint Flink about the return type of your
mapping function.
@Test public void myTest() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, config());
DataSet<Row> dataSet = env.fromElements( Row.of(1, 1L, "a"), Row.of(2, 2L, "b"), Row.of(3, 3L, "c") ); System.out.println(dataSet.getType());
DataSet<Row> ds = dataSet.map(row -> Row.of(row.getField(0), row.getField(1))); System.out.println(ds.getType());
DataSet<Row> ds2 = dataSet.map(new MyMappingFunc(dataSet)); System.out.println(ds2.getType()); }
private class MyMappingFunc implements MapFunction<Row, Row>, ResultTypeQueryable<Row> {
private RowTypeInfo producedType;
public MyMappingFunc(DataSet<Row> dataSet) { RowTypeInfo ti = (RowTypeInfo) dataSet.getType(); producedType = new RowTypeInfo(ti.getTypeAt(0), ti.getTypeAt(1)); }
@Override public TypeInformation<Row> getProducedType() { return producedType; }
@Override public Row map(Row value) throws Exception { return Row.of(value.getField(0), value.getField(1)); } } You can see that, the type of `ds2` will be the desired Row(f0: Integer, f1: Long).
This is the method what I can currently think of. Maybe there exists a simpler method, I'll ask the other Flink developers tomorrow. Maybe the developers would like to specially treat the `RowTypeInfo` when deriving return types of the mapping functions in the future.
Hi Caizhi, thanks for your reply.
Maybe I didn't elaborate enough. I'm familiar with that method, maybe. My problem is supposed you have a dataset with the schema (id: Integer, name: String, register_date: Date). If I try to map it like the following:
DataSet<Row> res = dataset.map(row -> { Row temp = Row.of( row.getField(0), row.getField(1) );
return temp; }); then the res type info will be : System.out.println(res.getType()); GenericType<org.apache.flink.types.Row>
But I want the typeinfo to show Row(id: Integer, name: String)
Is there any way to use the schema of the dataset in map operation to generate a new dataset?
Hi Soheil,
There is the `toDataSet(Table, TypeInformation<?>)` method in `BatchTableEnvironment` to which you can pass your optional RowTypeInfo. Note that the TypeInformation you pass to it must match the field types of the table.
Hi
Creating a new DataSet of type Row, how can I the RowTypeInfo of the row?
For example when I create a new dataset like the following:
Row row = Row.of(1, new Timestamp(1), new Date(1)); System.out.println(env.fromElements(row).getType()); it results in: Row(f0: Integer, f1: Timestamp, f2: Date)
While Flink automatically use f0, f1, ... for labeling I want to specify the label name of each element
For example, when I create a new DataSet from a Table, it contains the Table schema too:
DataSet res = tEnv.toDataSet(table, Row.class); Row(id: Integer, time: Timestamp, name: String, age: Integer, grade: Double)
So is there any way to create a new DataSet<Row> with optional RowTypeInfo?
|