Flink Table API

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Table API

Flavio Pompermaier
Hi to all,
I was trying to use the new Table API using the new Blink planner but I figured out that they do not use exactly the same APIs..for example I can't go back and forth from Tables to Dataset/Datastream anymore (using tableEnv.toDataset for example).
Is this a temporary behaviour or this functionality will be removed sooner or later (since from what I understood the current Flink planner will be replaced by the blink one)?
I like the idea of creating sourcing just using properties but then I can't transform it into a Dataset/Datastream.
Also the documentation seems to be quite messy about this transition phase...

This is my test program:

public static void main(String[] args) throws Exception {
  // set up the batch execution environment
  EnvironmentSettings bbSettings =         EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
  TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);


  String createTableSql = "CREATE TABLE civici (\n" //
    + "\t`X` DOUBLE,\n" //
    + "\t`Y` DOUBLE\n" //
    + ") WITH (\n"//
    + "\t'connector.type' = 'filesystem',\n"//
    + "\t'connector.path' = 'file:///tmp/test.csv',\n"//
    + "\t'format.type' = 'csv',\n" //
    + "\t'format.fields.0.name' = 'X',\n"//
    + "\t'format.fields.0.data-type' = 'DOUBLE',\n" //
    + "\t'format.fields.1.name' = 'Y',\n"//
    + "\t'format.fields.1.data-type' = 'DOUBLE',\n"//
    + "\t'format.field-delimiter' = ',',\n"//
//    + "\t'format.line-delimiter' = '\n',\n"//
    + "\t'format.quote-character' = '\"',\n"//
//    +"\t'format.allow-comments' = 'true',"//
//    +"\t'format.ignore-parse-errors' = 'true',"//
    + "\t'format.ignore-first-line' = 'true'\n"//
//    +"\t'format.array-element-delimiter' = '|', "//
//    +"\t'format.escape-character' = '\\',"//
//    +"\t'format.null-literal' = 'n/a'"//
    + ")";
  System.out.println(createTableSql);
  bbTableEnv.sqlUpdate(createTableSql);

  Table table = bbTableEnv.sqlQuery("SELECT * FROM civici");
  List<Row> rows = TableUtils.collectToList(table);

  for (Row row : rows) {
  System.out.println(row);
  }
}


Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API

Jark Wu-3
Hi Flavio,

There is 2 main entry points in Table API, one is `TableEnvironment`, another is `StreamTableEnvironment`.
- `TableEnvironment` is used for pure Table API & SQL programs.
- `StreamTableEnvironment` can be used to convert from/to DataStream.

These two interface will be kept in the future, but `BatchTableEnvironment` will be removed in the future. 

Currently, it's true that blink planner batch mode can't convert into DataStream, because `StreamTableEnvironment#create` 
only accepts streaming mode. But this is on the roadmap, once DataStream support a set of batch operations, I think we can 
make `StreamTableEnvironment` support batch mode.

If you want to conver into DataStream in blink planner batch mode, there is a tricky workaround that create a
`StreamTableEnvironment` via the `StreamTableEnvironmentImpl` constructor. 
SQL CLI does in this way, you can take as an example [1].

Best,
Jark



On Thu, 13 Feb 2020 at 00:15, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the new Table API using the new Blink planner but I figured out that they do not use exactly the same APIs..for example I can't go back and forth from Tables to Dataset/Datastream anymore (using tableEnv.toDataset for example).
Is this a temporary behaviour or this functionality will be removed sooner or later (since from what I understood the current Flink planner will be replaced by the blink one)?
I like the idea of creating sourcing just using properties but then I can't transform it into a Dataset/Datastream.
Also the documentation seems to be quite messy about this transition phase...

This is my test program:

public static void main(String[] args) throws Exception {
  // set up the batch execution environment
  EnvironmentSettings bbSettings =         EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
  TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);


  String createTableSql = "CREATE TABLE civici (\n" //
    + "\t`X` DOUBLE,\n" //
    + "\t`Y` DOUBLE\n" //
    + ") WITH (\n"//
    + "\t'connector.type' = 'filesystem',\n"//
    + "\t'connector.path' = 'file:///tmp/test.csv',\n"//
    + "\t'format.type' = 'csv',\n" //
    + "\t'format.fields.0.name' = 'X',\n"//
    + "\t'format.fields.0.data-type' = 'DOUBLE',\n" //
    + "\t'format.fields.1.name' = 'Y',\n"//
    + "\t'format.fields.1.data-type' = 'DOUBLE',\n"//
    + "\t'format.field-delimiter' = ',',\n"//
//    + "\t'format.line-delimiter' = '\n',\n"//
    + "\t'format.quote-character' = '\"',\n"//
//    +"\t'format.allow-comments' = 'true',"//
//    +"\t'format.ignore-parse-errors' = 'true',"//
    + "\t'format.ignore-first-line' = 'true'\n"//
//    +"\t'format.array-element-delimiter' = '|', "//
//    +"\t'format.escape-character' = '\\',"//
//    +"\t'format.null-literal' = 'n/a'"//
    + ")";
  System.out.println(createTableSql);
  bbTableEnv.sqlUpdate(createTableSql);

  Table table = bbTableEnv.sqlQuery("SELECT * FROM civici");
  List<Row> rows = TableUtils.collectToList(table);

  for (Row row : rows) {
  System.out.println(row);
  }
}


Best,
Flavio