Properly using ConnectorDescriptor instead of registerTableSource

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Properly using ConnectorDescriptor instead of registerTableSource

Nikola Hrusov
Hello,

I am trying to update my cluster from flink 1.9.2 to 1.10.1
We are running batch jobs in it.
So far everything has been smooth and I have seen one of my method calls being deprecated.

Our code is selecting some ORC files' contents and running query on them. It is something of this nature:

OrcTableSource orcTableSource = OrcTableSource.builder()
        .path(path, true)
        .forOrcSchema(ORC.getSchema())
        .withConfiguration(config)
        .build();

tableEnv.registerTableSource(targetTableName, orcTableSource);

Table targetTable = tableEnv.sqlQuery(whereSql);

return tableEnv.toDataSet(targetTable, Row.class);


The "tableEnv.registerTableSource(targetTableName, orcTableSource);" show as deprecated: Use connect(ConnectorDescriptor) instead.
I have looked into the release notes and docs regarding the table sources, but I cannot find how to properly substitute my code with non-deprecated one.

I looked in the flip: https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module but it only says it will be deprecated in he favor of connect.
I tried looking in the release notes https://flink.apache.org/news/2020/02/11/release-1.10.0.html but I cannot find an example of a ConnectorDescriptor. 
I looked here as well: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#orctablesource where it only hsows how to build a OrcTableSource, but not how to query it. And actually if you look under it, where it shows information regarding CsvTableSink, JDBCAppendTableSink and CassandraAppendTableSink they all use registerTableSink in the example which is also deprecated.

My biggest issue is taht ConnectorDescriptor requires 3 parameters:

* @param type string that identifies this connector
* @param version property version for backwards compatibility
* @param formatNeeded flag for basic validation of a needed format descriptor

which I do not know what to provide for.

Can you please guide me to a page where I can read more about the ConnectorDescriptor or give me some information on:

1) How can I do the same code I have in a way where it won't use deprecated methods?
2) What do those parameters of ConnectorDescriptor mean?

Regards
,
Nikola
Reply | Threaded
Open this post in threaded view
|

Re: Properly using ConnectorDescriptor instead of registerTableSource

Timo Walther
Hi Nikola,

the reason for deprecating `registerTableSource` is that we aim to have
everything declarative in Table API. A table program should simply
declare what it needs and the planner should find a suitable connector,
regardless how the underlying class structure looks like. This might
also beneficial for the future because dependencies must not be present
at the time of writing a program and can be loaded at a later stage.

In the last 2 releases we focused mostly on the SQL CREATE TABLE DDL.
Unfortunately, the connect() API has not received a major updated so
far. This is planned for the next release. What is missing to make the
OrcTableSource work again is a table factory and descriptor. The
descriptor is just a helper class that compiles the wishes of a user
down to a map of string-string properties.

I would suggest to use the SQL DDL and create a table factory that
matches against some string properties and configures your table source.
If your want to use connect(), you need a little helper POJO descriptor
as well. Here are some example of how to create your own factory and
descriptor:

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablefactory

Regards,
Timo


On 16.05.20 18:33, Nikola Hrusov wrote:

> Hello,
>
> I am trying to update my cluster from flink 1.9.2 to 1.10.1
> We are running batch jobs in it.
> So far everything has been smooth and I have seen one of my method calls
> being deprecated.
>
> Our code is selecting some ORC files' contents and running query on
> them. It is something of this nature:
>
> OrcTableSource orcTableSource = OrcTableSource.builder()
>          .path(path, true)
>          .forOrcSchema(ORC.getSchema())
>          .withConfiguration(config)
>          .build();
>
> tableEnv.registerTableSource(targetTableName, orcTableSource);
>
> Table targetTable = tableEnv.sqlQuery(whereSql);
>
> return tableEnv.toDataSet(targetTable, Row.class);
>
>
> The "tableEnv.registerTableSource(targetTableName, orcTableSource);"
> show as deprecated: Use connect(ConnectorDescriptor) instead.
> I have looked into the release notes and docs regarding the table
> sources, but I cannot find how to properly substitute my code with
> non-deprecated one.
>
> I looked in the flip:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module but
> it only says it will be deprecated in he favor of connect.
> I tried looking in the release notes
> https://flink.apache.org/news/2020/02/11/release-1.10.0.html but I
> cannot find an example of a ConnectorDescriptor.
> I looked here as well:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#orctablesource where
> it only hsows how to build a OrcTableSource, but not how to query it.
> And actually if you look under it, where it shows information
> regarding CsvTableSink, JDBCAppendTableSink and CassandraAppendTableSink
> they all use registerTableSink in the example which is also deprecated.
>
> My biggest issue is taht ConnectorDescriptor requires 3 parameters:
>
> * @param type string that identifies this connector
> * @param version property version for backwards compatibility
> * @param formatNeeded flag for basic validation of a needed format
> descriptor
>
> which I do not know what to provide for.
>
> Can you please guide me to a page where I can read more about the
> ConnectorDescriptor or give me some information on:
>
> 1) How can I do the same code I have in a way where it won't use
> deprecated methods?
> 2) What do those parameters of ConnectorDescriptor mean?
>
> Regards
> ,
> Nikola