Using Hive UDFs

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Hive UDFs

Youngwoo Kim (김영우)
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo


Reply | Threaded
Open this post in threaded view
|

Re: Using Hive UDFs

Shengkai Fang
Hi.

The order of the module may influence the load of the function. 


Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo


Reply | Threaded
Open this post in threaded view
|

Re: Using Hive UDFs

Rui Li
Hi Youngwoo,

The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered?

On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <[hidden email]> wrote:
Hi.

The order of the module may influence the load of the function. 


Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo




--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Using Hive UDFs

Youngwoo Kim (김영우)
Thanks Shengkai and Rui for looking into this.

A snippet from my app. looks like following:

    HiveCatalog hive = new HiveCatalog("flink-hive", "default", "/tmp/hive");

    tableEnv.registerCatalog("flink-hive", hive);


    tableEnv.useCatalog("flink-hive");

    tableEnv.loadModule("flink-hive", new HiveModule("3.1.2"));


    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);


    tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");

    tableEnv.executeSql("USE flink_gaia");

    tableEnv.executeSql("SHOW CURRENT CATALOG").print();

    tableEnv.executeSql("SHOW CURRENT DATABASE").print();

    tableEnv.executeSql("SHOW TABLES").print();

    tableEnv.executeSql("SHOW FUNCTIONS").print();

 

    // Test Hive UDF

    tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


And I got the following output and exception:


+----------------------+

| current catalog name |

+----------------------+

|           flink-hive |

+----------------------+

1 row in set

+-----------------------+

| current database name |

+-----------------------+

|            flink_gaia |

+-----------------------+

1 row in set

+----------------------+

|           table name |

+----------------------+

|             geofence |

|                 lcap |

| lcap_temporal_fenced |

+----------------------+


+--------------------------------+

|                  function name |

+--------------------------------+

|                       regr_sxy |

......


380 rows in set


(snip)


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 31: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)




Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 1:44 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered?

On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <[hidden email]> wrote:
Hi.

The order of the module may influence the load of the function. 


Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo




--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Using Hive UDFs

Rui Li
Hi Youngwoo,

Could you please share the function jar and DDL you used to create the function? I can try reproducing this issue locally.

On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Thanks Shengkai and Rui for looking into this.

A snippet from my app. looks like following:

    HiveCatalog hive = new HiveCatalog("flink-hive", "default", "/tmp/hive");

    tableEnv.registerCatalog("flink-hive", hive);


    tableEnv.useCatalog("flink-hive");

    tableEnv.loadModule("flink-hive", new HiveModule("3.1.2"));


    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);


    tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");

    tableEnv.executeSql("USE flink_gaia");

    tableEnv.executeSql("SHOW CURRENT CATALOG").print();

    tableEnv.executeSql("SHOW CURRENT DATABASE").print();

    tableEnv.executeSql("SHOW TABLES").print();

    tableEnv.executeSql("SHOW FUNCTIONS").print();

 

    // Test Hive UDF

    tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


And I got the following output and exception:


+----------------------+

| current catalog name |

+----------------------+

|           flink-hive |

+----------------------+

1 row in set

+-----------------------+

| current database name |

+-----------------------+

|            flink_gaia |

+-----------------------+

1 row in set

+----------------------+

|           table name |

+----------------------+

|             geofence |

|                 lcap |

| lcap_temporal_fenced |

+----------------------+


+--------------------------------+

|                  function name |

+--------------------------------+

|                       regr_sxy |

......


380 rows in set


(snip)


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 31: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)




Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 1:44 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered?

On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <[hidden email]> wrote:
Hi.

The order of the module may influence the load of the function. 


Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo




--
Best regards!
Rui Li


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Using Hive UDFs

Youngwoo Kim (김영우)
Hey Rui,

For geospatial udfs, I've configured these jars to my flink deployment:

# Flink-Hive

RUN wget -q -O /opt/flink/lib/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.12.2/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar \

  && wget -q -O /opt/flink/lib/hive-exec-3.1.2.jar https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar \

  && wget -q -O /opt/flink/lib/libfb303-0.9.3.jar http://databus.dbpedia.org:8081/repository/internal/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar


# Hive geospatial udf, https://github.com/Esri/spatial-framework-for-hadoop

RUN wget -q -O /opt/flink/lib/spatial-sdk-hive.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-hive-2.2.0.jar \

  && wget -q -O /opt/flink/lib/spatial-sdk-json.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-json-2.2.0.jar \

  && wget -q -O /opt/flink/lib/esri-geometry-api.jar https://repo1.maven.org/maven2/com/esri/geometry/esri-geometry-api/2.2.4/esri-geometry-api-2.2.4.jar



As I mentioned above, I did not register the functions explicitly because the 'CREATE FUNCTION ...'  statement did not work for me. If I run this statement, e.g., "CREATE FUNCTION ST_GeomFromText AS  'com.esri.hadoop.hive.ST_GeomFromText'" :


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more



Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 3:05 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

Could you please share the function jar and DDL you used to create the function? I can try reproducing this issue locally.

On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Thanks Shengkai and Rui for looking into this.

A snippet from my app. looks like following:

    HiveCatalog hive = new HiveCatalog("flink-hive", "default", "/tmp/hive");

    tableEnv.registerCatalog("flink-hive", hive);


    tableEnv.useCatalog("flink-hive");

    tableEnv.loadModule("flink-hive", new HiveModule("3.1.2"));


    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);


    tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");

    tableEnv.executeSql("USE flink_gaia");

    tableEnv.executeSql("SHOW CURRENT CATALOG").print();

    tableEnv.executeSql("SHOW CURRENT DATABASE").print();

    tableEnv.executeSql("SHOW TABLES").print();

    tableEnv.executeSql("SHOW FUNCTIONS").print();

 

    // Test Hive UDF

    tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


And I got the following output and exception:


+----------------------+

| current catalog name |

+----------------------+

|           flink-hive |

+----------------------+

1 row in set

+-----------------------+

| current database name |

+-----------------------+

|            flink_gaia |

+-----------------------+

1 row in set

+----------------------+

|           table name |

+----------------------+

|             geofence |

|                 lcap |

| lcap_temporal_fenced |

+----------------------+


+--------------------------------+

|                  function name |

+--------------------------------+

|                       regr_sxy |

......


380 rows in set


(snip)


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 31: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)




Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 1:44 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered?

On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <[hidden email]> wrote:
Hi.

The order of the module may influence the load of the function. 


Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo




--
Best regards!
Rui Li


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Using Hive UDFs

Youngwoo Kim (김영우)
Hey Rui,

My bad! 
You have already pointed out to me what I completely misunderstood. I've been confusing some of the steps to register udfs.
And also, somehow, my metastore was a mess. So, I cleaned up the metastore and database and then, I created a database for hive catalog and registered the functions explicitly using the 'CREATE FUNCTION' statement. At this point, I can see the registered udfs from 'SHOW FUNCTIONS' output.

Now, Got an expected output from the 'SELECT...' statement:
SELECT ST_AsText(ST_Point(1, 2))
......

+----+--------------------------------+

| op |                         EXPR$0 |

+----+--------------------------------+

| +I |                    POINT (1 2) |

+----+--------------------------------+



Sorry for the noise and my tardiness.


Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 3:39 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Hey Rui,

For geospatial udfs, I've configured these jars to my flink deployment:

# Flink-Hive

RUN wget -q -O /opt/flink/lib/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.12.2/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar \

  && wget -q -O /opt/flink/lib/hive-exec-3.1.2.jar https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar \

  && wget -q -O /opt/flink/lib/libfb303-0.9.3.jar http://databus.dbpedia.org:8081/repository/internal/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar


# Hive geospatial udf, https://github.com/Esri/spatial-framework-for-hadoop

RUN wget -q -O /opt/flink/lib/spatial-sdk-hive.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-hive-2.2.0.jar \

  && wget -q -O /opt/flink/lib/spatial-sdk-json.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-json-2.2.0.jar \

  && wget -q -O /opt/flink/lib/esri-geometry-api.jar https://repo1.maven.org/maven2/com/esri/geometry/esri-geometry-api/2.2.4/esri-geometry-api-2.2.4.jar



As I mentioned above, I did not register the functions explicitly because the 'CREATE FUNCTION ...'  statement did not work for me. If I run this statement, e.g., "CREATE FUNCTION ST_GeomFromText AS  'com.esri.hadoop.hive.ST_GeomFromText'" :


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more



Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 3:05 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

Could you please share the function jar and DDL you used to create the function? I can try reproducing this issue locally.

On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Thanks Shengkai and Rui for looking into this.

A snippet from my app. looks like following:

    HiveCatalog hive = new HiveCatalog("flink-hive", "default", "/tmp/hive");

    tableEnv.registerCatalog("flink-hive", hive);


    tableEnv.useCatalog("flink-hive");

    tableEnv.loadModule("flink-hive", new HiveModule("3.1.2"));


    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);


    tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");

    tableEnv.executeSql("USE flink_gaia");

    tableEnv.executeSql("SHOW CURRENT CATALOG").print();

    tableEnv.executeSql("SHOW CURRENT DATABASE").print();

    tableEnv.executeSql("SHOW TABLES").print();

    tableEnv.executeSql("SHOW FUNCTIONS").print();

 

    // Test Hive UDF

    tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


And I got the following output and exception:


+----------------------+

| current catalog name |

+----------------------+

|           flink-hive |

+----------------------+

1 row in set

+-----------------------+

| current database name |

+-----------------------+

|            flink_gaia |

+-----------------------+

1 row in set

+----------------------+

|           table name |

+----------------------+

|             geofence |

|                 lcap |

| lcap_temporal_fenced |

+----------------------+


+--------------------------------+

|                  function name |

+--------------------------------+

|                       regr_sxy |

......


380 rows in set


(snip)


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 31: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)




Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 1:44 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered?

On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <[hidden email]> wrote:
Hi.

The order of the module may influence the load of the function. 


Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo




--
Best regards!
Rui Li


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Using Hive UDFs

Rui Li
Hi Youngwoo,

That's no problem at all and glad to know the UDF works now.
Yeah, before you can use a hive udf, you should register it into metastore. And that can be done via either Flink or Hive.

Feel free to let me know if you encounter any other issues.

On Wed, Apr 28, 2021 at 4:28 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Hey Rui,

My bad! 
You have already pointed out to me what I completely misunderstood. I've been confusing some of the steps to register udfs.
And also, somehow, my metastore was a mess. So, I cleaned up the metastore and database and then, I created a database for hive catalog and registered the functions explicitly using the 'CREATE FUNCTION' statement. At this point, I can see the registered udfs from 'SHOW FUNCTIONS' output.

Now, Got an expected output from the 'SELECT...' statement:
SELECT ST_AsText(ST_Point(1, 2))
......

+----+--------------------------------+

| op |                         EXPR$0 |

+----+--------------------------------+

| +I |                    POINT (1 2) |

+----+--------------------------------+



Sorry for the noise and my tardiness.


Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 3:39 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Hey Rui,

For geospatial udfs, I've configured these jars to my flink deployment:

# Flink-Hive

RUN wget -q -O /opt/flink/lib/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.12.2/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar \

  && wget -q -O /opt/flink/lib/hive-exec-3.1.2.jar https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar \

  && wget -q -O /opt/flink/lib/libfb303-0.9.3.jar http://databus.dbpedia.org:8081/repository/internal/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar


# Hive geospatial udf, https://github.com/Esri/spatial-framework-for-hadoop

RUN wget -q -O /opt/flink/lib/spatial-sdk-hive.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-hive-2.2.0.jar \

  && wget -q -O /opt/flink/lib/spatial-sdk-json.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-json-2.2.0.jar \

  && wget -q -O /opt/flink/lib/esri-geometry-api.jar https://repo1.maven.org/maven2/com/esri/geometry/esri-geometry-api/2.2.4/esri-geometry-api-2.2.4.jar



As I mentioned above, I did not register the functions explicitly because the 'CREATE FUNCTION ...'  statement did not work for me. If I run this statement, e.g., "CREATE FUNCTION ST_GeomFromText AS  'com.esri.hadoop.hive.ST_GeomFromText'" :


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more



Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 3:05 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

Could you please share the function jar and DDL you used to create the function? I can try reproducing this issue locally.

On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Thanks Shengkai and Rui for looking into this.

A snippet from my app. looks like following:

    HiveCatalog hive = new HiveCatalog("flink-hive", "default", "/tmp/hive");

    tableEnv.registerCatalog("flink-hive", hive);


    tableEnv.useCatalog("flink-hive");

    tableEnv.loadModule("flink-hive", new HiveModule("3.1.2"));


    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);


    tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");

    tableEnv.executeSql("USE flink_gaia");

    tableEnv.executeSql("SHOW CURRENT CATALOG").print();

    tableEnv.executeSql("SHOW CURRENT DATABASE").print();

    tableEnv.executeSql("SHOW TABLES").print();

    tableEnv.executeSql("SHOW FUNCTIONS").print();

 

    // Test Hive UDF

    tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


And I got the following output and exception:


+----------------------+

| current catalog name |

+----------------------+

|           flink-hive |

+----------------------+

1 row in set

+-----------------------+

| current database name |

+-----------------------+

|            flink_gaia |

+-----------------------+

1 row in set

+----------------------+

|           table name |

+----------------------+

|             geofence |

|                 lcap |

| lcap_temporal_fenced |

+----------------------+


+--------------------------------+

|                  function name |

+--------------------------------+

|                       regr_sxy |

......


380 rows in set


(snip)


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 31: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)




Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 1:44 PM Rui Li <[hidden email]> wrote:
Hi Youngwoo,

The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered?

On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <[hidden email]> wrote:
Hi.

The order of the module may influence the load of the function. 


Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)

at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)

at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)

at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)

at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)

... 21 more

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)

... 51 more


(snip)


And also, there are no such functions from 'SHOW FUNCTIONS':

tableEnv.executeSql("SHOW FUNCTIONS").print();


......

(snip)



Registering the functions explicitly does not work for me:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

(snip)



I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments.


Thanks,

Youngwoo




--
Best regards!
Rui Li


--
Best regards!
Rui Li


--
Best regards!
Rui Li