How to fix deprecation on registerTableSink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to fix deprecation on registerTableSink

Flavio Pompermaier
Hello everybody,
I was trying to get rid of the deprecation warnings about using BatchTableEnvironment.registerTableSink() but I don't know how to proceed.

My current code does the following:

BatchTableEnvironment benv = BatchTableEnvironment.create(env);
benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(),
        new CsvTableSink(outputDir + "users.tsv", "\t", 1, WriteMode.OVERWRITE));
benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");

Initially I thought to port the code to benv.connect() because it I can use the IDE autocomplete but I discovered that also connect () is deprecated in favor of executeSql(). Just for the sake of curiosity I've tried to use connect() and I didn't find how to specify overwrite. Using INSERT OVERWRITE was causing this error:

INSERT OVERWRITE requires OverwritableTableSink but actually got org.apache.flink.table.sinks.CsvTableSink

Probably using executeSql is the only non-deprecated way to register my sink. So I started to write the CREATE statement to create my table but also here there are 2 problems:

1) Do I really have to write by myself a method that convert the schema into the relative string? Is there any utility that already does that? My naive attempt was something like:

   private static String getCreateStatement(String tableName, UserToRow userToRow) {
   return "CREATE TABLE " + tableName + " (" + //
   userToRow.getSchema().toString() + ")" + // this does not work unfortunately
   ") WITH (" + //
   "'connector' = 'filesystem'," + //
   "'path' = 'file:///tmp/test.csv'," + //
   "'format' = 'csv'," + //
   "'sink.shuffle-by-partition.enable' = 'false'" + //
   ");";
   }
2) How to solve the overwrite problem..?
3) Is it really the only non-deprecated way to create a table the executeSql? 

Thanks in advance,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: How to fix deprecation on registerTableSink

Flavio Pompermaier
Any advice on how to fix those problems?

Best,
Flavio

On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier <[hidden email]> wrote:
Hello everybody,
I was trying to get rid of the deprecation warnings about using BatchTableEnvironment.registerTableSink() but I don't know how to proceed.

My current code does the following:

BatchTableEnvironment benv = BatchTableEnvironment.create(env);
benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(),
        new CsvTableSink(outputDir + "users.tsv", "\t", 1, WriteMode.OVERWRITE));
benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");

Initially I thought to port the code to benv.connect() because it I can use the IDE autocomplete but I discovered that also connect () is deprecated in favor of executeSql(). Just for the sake of curiosity I've tried to use connect() and I didn't find how to specify overwrite. Using INSERT OVERWRITE was causing this error:

INSERT OVERWRITE requires OverwritableTableSink but actually got org.apache.flink.table.sinks.CsvTableSink

Probably using executeSql is the only non-deprecated way to register my sink. So I started to write the CREATE statement to create my table but also here there are 2 problems:

1) Do I really have to write by myself a method that convert the schema into the relative string? Is there any utility that already does that? My naive attempt was something like:

   private static String getCreateStatement(String tableName, UserToRow userToRow) {
   return "CREATE TABLE " + tableName + " (" + //
   userToRow.getSchema().toString() + ")" + // this does not work unfortunately
   ") WITH (" + //
   "'connector' = 'filesystem'," + //
   "'path' = 'file:///tmp/test.csv'," + //
   "'format' = 'csv'," + //
   "'sink.shuffle-by-partition.enable' = 'false'" + //
   ");";
   }
2) How to solve the overwrite problem..?
3) Is it really the only non-deprecated way to create a table the executeSql? 

Thanks in advance,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: How to fix deprecation on registerTableSink

Timo Walther
Hi Flavio,

FLIP-129 will update the connect() API with a programmatic way of
defining tables. In the API we currently only support the DDL via
executeSql.

I would recommend to implement the Catalog interface. This interface has
a lot of methods, but you only need to implement a couple of methods for
returning a CatalogTable. There is also a CatalogTableImpl that is
basically the CREATE TABLE statement in a programmatic interface.

The missing updated connect() API is the reason why we haven't dropped
the registerTableSink yet. It is also fine to continue using it for now.

Regards,
Timo

On 25.01.21 09:40, Flavio Pompermaier wrote:

> Any advice on how to fix those problems?
>
> Best,
> Flavio
>
> On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hello everybody,
>     I was trying to get rid of the deprecation warnings about
>     using BatchTableEnvironment.registerTableSink() but I don't know how
>     to proceed.
>
>     My current code does the following:
>
>     BatchTableEnvironment benv = BatchTableEnvironment.create(env);
>     benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(),
>              new CsvTableSink(outputDir + "users.tsv", "\t", 1,
>     WriteMode.OVERWRITE));
>     benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");
>
>     Initially I thought to port the code to benv.connect() because it I
>     can use the IDE autocomplete but I discovered that also connect ()
>     is deprecated in favor of executeSql(). Just for the sake of
>     curiosity I've tried to use connect() and I didn't find how to
>     specify overwrite. Using INSERT OVERWRITE was causing this error:
>
>     INSERT OVERWRITE requires OverwritableTableSink but actually got
>     org.apache.flink.table.sinks.CsvTableSink
>
>     Probably using executeSql is the only non-deprecated way to register
>     my sink. So I started to write the CREATE statement to create my
>     table but also here there are 2 problems:
>
>     1) Do I really have to write by myself a method that convert the
>     schema into the relative string? Is there any utility that already
>     does that? My naive attempt was something like:
>
>         private static String getCreateStatement(String tableName,
>     UserToRow userToRow) {
>         return "CREATE TABLE " + tableName + " (" + //
>         userToRow.getSchema().toString() + ")" + // this does not work
>     unfortunately
>         ") WITH (" + //
>         "'connector' = 'filesystem'," + //
>         "'path' = 'file:///tmp/test.csv'," + //
>         "'format' = 'csv'," + //
>         "'sink.shuffle-by-partition.enable' = 'false'" + //
>         ");";
>         }
>     2) How to solve the overwrite problem..?
>     3) Is it really the only non-deprecated way to create a table
>     the executeSql?
>
>     Thanks in advance,
>     Flavio
>

Reply | Threaded
Open this post in threaded view
|

Re: How to fix deprecation on registerTableSink

Flavio Pompermaier
Great! Thanks for the detailed answer TImo! I think I'll wait for the migration to finish before updating my code.
However, does the usage of a catalog solve the problem of CSV override as well? I can't find a way to use INSERT OVERRIDE with a CSV sink using the executeSql.

Best,
Flavio

On Mon, Jan 25, 2021 at 10:54 AM Timo Walther <[hidden email]> wrote:
Hi Flavio,

FLIP-129 will update the connect() API with a programmatic way of
defining tables. In the API we currently only support the DDL via
executeSql.

I would recommend to implement the Catalog interface. This interface has
a lot of methods, but you only need to implement a couple of methods for
returning a CatalogTable. There is also a CatalogTableImpl that is
basically the CREATE TABLE statement in a programmatic interface.

The missing updated connect() API is the reason why we haven't dropped
the registerTableSink yet. It is also fine to continue using it for now.

Regards,
Timo

On 25.01.21 09:40, Flavio Pompermaier wrote:
> Any advice on how to fix those problems?
>
> Best,
> Flavio
>
> On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hello everybody,
>     I was trying to get rid of the deprecation warnings about
>     using BatchTableEnvironment.registerTableSink() but I don't know how
>     to proceed.
>
>     My current code does the following:
>
>     BatchTableEnvironment benv = BatchTableEnvironment.create(env);
>     benv.registerTableSink("outUsers", getFieldNames(), getFieldTypes(),
>              new CsvTableSink(outputDir + "users.tsv", "\t", 1,
>     WriteMode.OVERWRITE));
>     benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");
>
>     Initially I thought to port the code to benv.connect() because it I
>     can use the IDE autocomplete but I discovered that also connect ()
>     is deprecated in favor of executeSql(). Just for the sake of
>     curiosity I've tried to use connect() and I didn't find how to
>     specify overwrite. Using INSERT OVERWRITE was causing this error:
>
>     INSERT OVERWRITE requires OverwritableTableSink but actually got
>     org.apache.flink.table.sinks.CsvTableSink
>
>     Probably using executeSql is the only non-deprecated way to register
>     my sink. So I started to write the CREATE statement to create my
>     table but also here there are 2 problems:
>
>     1) Do I really have to write by myself a method that convert the
>     schema into the relative string? Is there any utility that already
>     does that? My naive attempt was something like:
>
>         private static String getCreateStatement(String tableName,
>     UserToRow userToRow) {
>         return "CREATE TABLE " + tableName + " (" + //
>         userToRow.getSchema().toString() + ")" + // this does not work
>     unfortunately
>         ") WITH (" + //
>         "'connector' = 'filesystem'," + //
>         "'path' = 'file:///tmp/test.csv'," + //
>         "'format' = 'csv'," + //
>         "'sink.shuffle-by-partition.enable' = 'false'" + //
>         ");";
>         }
>     2) How to solve the overwrite problem..?
>     3) Is it really the only non-deprecated way to create a table
>     the executeSql?
>
>     Thanks in advance,
>     Flavio

Reply | Threaded
Open this post in threaded view
|

Re: How to fix deprecation on registerTableSink

Timo Walther
In the current Flink version, the OVERWRITE should be added to every
INSERT INTO statement. It is not part of the connector anymore. Maybe we
can introduce an option in the future to define the default connector
behavior (feel free to open an issue for this if you think this is
required).

However, the OVERWRITE clause might be only supported in the Blink
planner. And the BatchTableEnvironment is basically deprecated and
should be replaced with the unified TableEnvironment in batch mode.

Regards,
Timo

On 25.01.21 11:06, Flavio Pompermaier wrote:

> Great! Thanks for the detailed answer TImo! I think I'll wait for the
> migration to finish before updating my code.
> However, does the usage of a catalog solve the problem of CSV override
> as well? I can't find a way to use INSERT OVERRIDE with a CSV sink using
> the executeSql.
>
> Best,
> Flavio
>
> On Mon, Jan 25, 2021 at 10:54 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Flavio,
>
>     FLIP-129 will update the connect() API with a programmatic way of
>     defining tables. In the API we currently only support the DDL via
>     executeSql.
>
>     I would recommend to implement the Catalog interface. This interface
>     has
>     a lot of methods, but you only need to implement a couple of methods
>     for
>     returning a CatalogTable. There is also a CatalogTableImpl that is
>     basically the CREATE TABLE statement in a programmatic interface.
>
>     The missing updated connect() API is the reason why we haven't dropped
>     the registerTableSink yet. It is also fine to continue using it for now.
>
>     Regards,
>     Timo
>
>     On 25.01.21 09:40, Flavio Pompermaier wrote:
>      > Any advice on how to fix those problems?
>      >
>      > Best,
>      > Flavio
>      >
>      > On Thu, Jan 21, 2021 at 4:03 PM Flavio Pompermaier
>     <[hidden email] <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     Hello everybody,
>      >     I was trying to get rid of the deprecation warnings about
>      >     using BatchTableEnvironment.registerTableSink() but I don't
>     know how
>      >     to proceed.
>      >
>      >     My current code does the following:
>      >
>      >     BatchTableEnvironment benv = BatchTableEnvironment.create(env);
>      >     benv.registerTableSink("outUsers", getFieldNames(),
>     getFieldTypes(),
>      >              new CsvTableSink(outputDir + "users.tsv", "\t", 1,
>      >     WriteMode.OVERWRITE));
>      >     benv.executeSql("INSERT INTO `outUsers` SELECT * FROM users");
>      >
>      >     Initially I thought to port the code to benv.connect()
>     because it I
>      >     can use the IDE autocomplete but I discovered that also
>     connect ()
>      >     is deprecated in favor of executeSql(). Just for the sake of
>      >     curiosity I've tried to use connect() and I didn't find how to
>      >     specify overwrite. Using INSERT OVERWRITE was causing this error:
>      >
>      >     INSERT OVERWRITE requires OverwritableTableSink but actually got
>      >     org.apache.flink.table.sinks.CsvTableSink
>      >
>      >     Probably using executeSql is the only non-deprecated way to
>     register
>      >     my sink. So I started to write the CREATE statement to create my
>      >     table but also here there are 2 problems:
>      >
>      >     1) Do I really have to write by myself a method that convert the
>      >     schema into the relative string? Is there any utility that
>     already
>      >     does that? My naive attempt was something like:
>      >
>      >         private static String getCreateStatement(String tableName,
>      >     UserToRow userToRow) {
>      >         return "CREATE TABLE " + tableName + " (" + //
>      >         userToRow.getSchema().toString() + ")" + // this does not
>     work
>      >     unfortunately
>      >         ") WITH (" + //
>      >         "'connector' = 'filesystem'," + //
>      >         "'path' = 'file:///tmp/test.csv'," + //
>      >         "'format' = 'csv'," + //
>      >         "'sink.shuffle-by-partition.enable' = 'false'" + //
>      >         ");";
>      >         }
>      >     2) How to solve the overwrite problem..?
>      >     3) Is it really the only non-deprecated way to create a table
>      >     the executeSql?
>      >
>      >     Thanks in advance,
>      >     Flavio
>      >
>