Reverse the order of fields in Flink SQL

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Reverse the order of fields in Flink SQL

yinhua.dai
This post was updated on .
I write a customized table source, and it emits some fields let's say f1, f2.

And then I just write to a sink with a reversed order of fields, as below:
*select f2, f1 from customTableSource*

And I found that it actually doesn't do the field reverse.


Then I tried with flink provided CsvTableSource and CsvTableSink, I found
that it can successfully reverse the order, and after some investigation I
found that it seems the function related with two things:
1. *CsvTableSource* implemented *ProjectableTableSource*
2. *RowCsvInputFormat* supports *selectedFields*

Do I have to do the two things as well in my custom table source to get the
reverse work?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Reverse the order of fields in Flink SQL

Timo Walther
Hi Yinhua,

your custom sink must implement
`org.apache.flink.table.sinks.TableSink#configure`. This method is
called when writing to a sink such that the sink can configure itself
for the reverse order. The methods `getFieldTypes` and `getFieldNames`
must then return the reconfigured schema; must match to the query result
schema.

Regards,
Timo


Am 23.10.18 um 09:47 schrieb yinhua.dai:

> I write a customized table source, and it emits some fields let's say f1, f2.
>
> And then I just write to a sink with a reversed order of fields, as below:
> *select f2, f1 from customTableSource*
>
> And I found that it actually doesn't do the field reverse.
>
>
> Then I tried with flink provided CsvTableSource and CsvTableSink, I found
> that it has no problem reverse the order, and after some investigation I
> found that it's related with two things:
> 1. *CsvTableSource* implemented *ProjectableTableSource*
> 2. *RowCsvInputFormat* supports *selectedFields*
>
> Do I have to do the two things as well in my custom table source to get the
> reverse work?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Reverse the order of fields in Flink SQL

yinhua.dai
I think I have already done that in my custom sink.

 @Override
  public String[] getFieldNames() {
    return this.fieldNames;
  }

  @Override
  public TypeInformation<?>[] getFieldTypes() {

    return this.fieldTypes;
  }

@Override
  public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[]
fieldTypes) {
    this.fieldNames = fieldNames;
    this.fieldTypes = fieldTypes;
    return this;
  }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Reverse the order of fields in Flink SQL

yinhua.dai
This post was updated on .
Hi Timo,

I wrote a simple testing code which reproduced the issue, please checkout
https://gist.github.com/yinhua-dai/143304464270afd19b6a926531f9acb1

In the sample I wrote a custom table source which just use RowCsvInputformat to create the
dataset, and use the provided CsvTableSink which apparently implemented the configure interface.


--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/