Hi,
I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration. However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1] To sanity check, I did run a query like below: tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))"); Got an exception like this: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at com.skt.chiron.FlinkApp.main(FlinkApp.java:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ... 11 more Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 18 to line 1, column 63: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741) at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) ... 21 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
... 51 more (snip) And also, there are no such functions from 'SHOW FUNCTIONS':
tableEnv.executeSql("SHOW FUNCTIONS").print(); ...... (snip) Registering the functions explicitly does not work for me: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive. at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at com.skt.chiron.FlinkApp.main(FlinkApp.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 11 more (snip) I hope to find out why the functions are missing. Flink(Ver. 1.12.2) job cluster is running on Kubernetes cluster via flink operator and the standalone metastore is running for only the Flink cluster without Hive deployments. Thanks, Youngwoo |
Hi. The order of the module may influence the load of the function. Youngwoo Kim (김영우) <[hidden email]> 于2021年4月28日周三 上午10:50写道:
|
Hi Youngwoo, The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered? On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <[hidden email]> wrote:
Best regards!
Rui Li |
Thanks Shengkai and Rui for looking into this. A snippet from my app. looks like following: HiveCatalog hive = new HiveCatalog("flink-hive", "default", "/tmp/hive"); tableEnv.registerCatalog("flink-hive", hive); tableEnv.useCatalog("flink-hive"); tableEnv.loadModule("flink-hive", new HiveModule("3.1.2")); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia"); tableEnv.executeSql("USE flink_gaia"); tableEnv.executeSql("SHOW CURRENT CATALOG").print(); tableEnv.executeSql("SHOW CURRENT DATABASE").print(); tableEnv.executeSql("SHOW TABLES").print(); tableEnv.executeSql("SHOW FUNCTIONS").print();
// Test Hive UDF tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))"); And I got the following output and exception: +----------------------+ | current catalog name | +----------------------+ | flink-hive | +----------------------+ 1 row in set +-----------------------+ | current database name | +-----------------------+ | flink_gaia | +-----------------------+ 1 row in set +----------------------+ | table name | +----------------------+ | geofence | | lcap | | lcap_temporal_fenced |
+----------------------+ +--------------------------------+ | function name | +--------------------------------+
| regr_sxy | ......
380 rows in set (snip) org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 18 to line 1, column 31: No match found for function signature ST_Point(<NUMERIC>, <NUMERIC>) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Thanks, Youngwoo On Wed, Apr 28, 2021 at 1:44 PM Rui Li <[hidden email]> wrote:
|
Hi Youngwoo, On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Best regards!
Rui Li |
Hey Rui, For geospatial udfs, I've configured these jars to my flink deployment: # Flink-Hive RUN wget -q -O /opt/flink/lib/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.12.2/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar \ && wget -q -O /opt/flink/lib/hive-exec-3.1.2.jar https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar \ && wget -q -O /opt/flink/lib/libfb303-0.9.3.jar http://databus.dbpedia.org:8081/repository/internal/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar # Hive geospatial udf, https://github.com/Esri/spatial-framework-for-hadoop RUN wget -q -O /opt/flink/lib/spatial-sdk-hive.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-hive-2.2.0.jar \ && wget -q -O /opt/flink/lib/spatial-sdk-json.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-json-2.2.0.jar \ && wget -q -O /opt/flink/lib/esri-geometry-api.jar https://repo1.maven.org/maven2/com/esri/geometry/esri-geometry-api/2.2.4/esri-geometry-api-2.2.4.jar As I mentioned above, I did not register the functions explicitly because the 'CREATE FUNCTION ...' statement did not work for me. If I run this statement, e.g., "CREATE FUNCTION ST_GeomFromText AS 'com.esri.hadoop.hive.ST_GeomFromText'" : org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive. at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at com.skt.chiron.FlinkApp.main(FlinkApp.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ... 11 more Thanks, Youngwoo On Wed, Apr 28, 2021 at 3:05 PM Rui Li <[hidden email]> wrote:
|
Hey Rui, My bad! You have already pointed out to me what I completely misunderstood. I've been confusing some of the steps to register udfs. And also, somehow, my metastore was a mess. So, I cleaned up the metastore and database and then, I created a database for hive catalog and registered the functions explicitly using the 'CREATE FUNCTION' statement. At this point, I can see the registered udfs from 'SHOW FUNCTIONS' output. Now, Got an expected output from the 'SELECT...' statement: SELECT ST_AsText(ST_Point(1, 2)) ...... +----+--------------------------------+ | op | EXPR$0 | +----+--------------------------------+ | +I | POINT (1 2) | +----+--------------------------------+ Sorry for the noise and my tardiness. Thanks, Youngwoo On Wed, Apr 28, 2021 at 3:39 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
|
Hi Youngwoo, That's no problem at all and glad to know the UDF works now. Yeah, before you can use a hive udf, you should register it into metastore. And that can be done via either Flink or Hive. Feel free to let me know if you encounter any other issues. On Wed, Apr 28, 2021 at 4:28 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Best regards!
Rui Li |
Free forum by Nabble | Edit this page |