Flink cannot recognized catalog set by registerCatalog.

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink cannot recognized catalog set by registerCatalog.

Shu Su
Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the 
SQL, and it raises the error like :

    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more
   
It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?

   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");

Thanks,
SImon

Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Dawid Wysakowicz-2

Hi Simon,

First of all for more thorough discussion you might want to have a look at this thread: https://lists.apache.org/thread.html/b450df1a7bf10187301820e529cbc223ce63f233c1af0f0c7415e62b@%3Cdev.flink.apache.org%3E

TL;DR; All objects registered with registerTable/registerTableSource are temporary objects that do not have serializable form and therefore can only be stored in an in-memory catalog. The useCatalog/useDatabase are experimental APIs in the upcoming 1.9 release.

If you want to be sure that tables are stored in a given catalog you can either register it directly via tEnv.getCatalog().createTable() or you can try using SQL DDL.

Best,

Dawid

On 12/08/2019 09:37, Simon Su wrote:
Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the 
SQL, and it raises the error like :

    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more
   
It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?

   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
    .withFormat(csv)
    .withSchema(schema2)
    .inAppendMode()
    .registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
    .withFormat(csv)
    .withSchema(schema2)
    .inAppendMode()
    .registerTableSink("sinkstream");;
    
String sql = "insert into ca1.db1.sinkstream " +
    "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
    "group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");

Thanks,
SImon


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Xuefu Z
In reply to this post by Shu Su
Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more

It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon



--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Shu Su
Hi Xuefu

Thanks for you reply. 

Actually I have tried it as your advises. I have tried to call tableEnv.useCatalog and useDatabase. Also I have tried to use “catalogname.databasename.tableName”  in SQL. I think the root cause is that when I call tableEnv.registerTableSource, it’s always use a “build-in”
Catalog and Database rather than the custom one. So if I want to use a custom one, I have to write code like this:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("ca1")
.withBuiltInDatabaseName("db1")
.build());

As Dawid said, if I want to store in my custom catalog, I can call catalog.createTable or using DDL.

Thanks,
SImon

On 08/13/2019 02:55[hidden email] wrote:
Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more

It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon



--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Xuefu Z
Yes, tableEnv.registerTable(_) etc always registers in the default catalog. To create table in your custom catalog, you could use tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:
Hi Xuefu

Thanks for you reply. 

Actually I have tried it as your advises. I have tried to call tableEnv.useCatalog and useDatabase. Also I have tried to use “catalogname.databasename.tableName”  in SQL. I think the root cause is that when I call tableEnv.registerTableSource, it’s always use a “build-in”
Catalog and Database rather than the custom one. So if I want to use a custom one, I have to write code like this:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("ca1")
.withBuiltInDatabaseName("db1")
.build());

As Dawid said, if I want to store in my custom catalog, I can call catalog.createTable or using DDL.

Thanks,
SImon

On 08/13/2019 02:55[hidden email] wrote:
Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more

It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon



--
Xuefu Zhang

"In Honey We Trust!"


--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Jark Wu-3
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 

"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Shu Su
Hi Jark

Thanks for your reply. 

It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.

Thanks,
SImon

On 08/13/2019 12:27[hidden email] wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 

"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Jark Wu-3
Hi Simon,

This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461.

Regards,
Jark

On Tue, 13 Aug 2019 at 13:57, Simon Su <[hidden email]> wrote:
Hi Jark

Thanks for your reply. 

It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.

Thanks,
SImon

On 08/13/2019 12:27[hidden email] wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 

"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Shu Su
OK, Thanks Jark

Thanks,
SImon

On 08/13/2019 14:05[hidden email] wrote:
Hi Simon,

This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461.

Regards,
Jark

On Tue, 13 Aug 2019 at 13:57, Simon Su <[hidden email]> wrote:
Hi Jark

Thanks for your reply. 

It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.

Thanks,
SImon

On 08/13/2019 12:27[hidden email] wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 

"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"