PyFlink UDF: No match found for function signature XXX

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink UDF: No match found for function signature XXX

Yik San Chan
Hi,

I have a PyFlink script that fails to use a simple UDF. The full script can be found below:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
DataTypes,
EnvironmentSettings,
SqlDialect,
StreamTableEnvironment,
)
from pyflink.table.udf import udf


@udf(
input_types=[DataTypes.INT(), DataTypes.INT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j


TRANSFORM = """
INSERT INTO aiinfra.mysink
SELECT ADD(a, b)
FROM aiinfra.mysource
"""

CREATE_CATALOG = """
CREATE CATALOG hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
)"""

USE_CATALOG = "USE CATALOG hive"


exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
stream_execution_environment=exec_env, environment_settings=env_settings
)

t_env.create_temporary_function("add", add)

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG)
t_env.execute_sql(USE_CATALOG)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_result = t_env.execute_sql(TRANSFORM)
```

However, when I submit the python file to my flink cluster, it throws exception:

```
[INFO] 2021-05-18 17:27:47.758  - [taskAppId=TASK-90019-86729-380519]:[152] -  -> Traceback (most recent call last):
 File "aiinfra/batch_example.py", line 50, in <module>
   t_result = t_env.execute_sql(TRANSFORM)
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 766, in execute_sql
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 8 to line 3, column 16: No match found for function signature ADD(<NUMERIC>, <NUMERIC>)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

Seems it has difficulties knowing the "add" function has already been registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I don't think it is a upper-or-lower case issue.

Also, if I replace "ADD(a, b)" with the simple "a + b", the script produces exactly what I need.

Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint.

Any help? Thanks!

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: No match found for function signature XXX

Dian Fu
Hi Yik San,

The expected input types for add are DataTypes.INT, however, the schema of aiinfra.mysource is: a bigint and b bigint.

Regards,
Dian

2021年5月18日 下午5:38,Yik San Chan <[hidden email]> 写道:

Hi,

I have a PyFlink script that fails to use a simple UDF. The full script can be found below:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
DataTypes,
EnvironmentSettings,
SqlDialect,
StreamTableEnvironment,
)
from pyflink.table.udf import udf


@udf(
input_types=[DataTypes.INT(), DataTypes.INT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j


TRANSFORM = """
INSERT INTO aiinfra.mysink
SELECT ADD(a, b)
FROM aiinfra.mysource
"""

CREATE_CATALOG = """
CREATE CATALOG hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
)"""

USE_CATALOG = "USE CATALOG hive"


exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
stream_execution_environment=exec_env, environment_settings=env_settings
)

t_env.create_temporary_function("add", add)

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG)
t_env.execute_sql(USE_CATALOG)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_result = t_env.execute_sql(TRANSFORM)
```

However, when I submit the python file to my flink cluster, it throws exception:

```
[INFO] 2021-05-18 17:27:47.758  - [taskAppId=TASK-90019-86729-380519]:[152] -  -> Traceback (most recent call last):
 File "aiinfra/batch_example.py", line 50, in <module>
   t_result = t_env.execute_sql(TRANSFORM)
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 766, in execute_sql
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 8 to line 3, column 16: No match found for function signature ADD(<NUMERIC>, <NUMERIC>)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

Seems it has difficulties knowing the "add" function has already been registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I don't think it is a upper-or-lower case issue.

Also, if I replace "ADD(a, b)" with the simple "a + b", the script produces exactly what I need.

Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint.

Any help? Thanks!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: No match found for function signature XXX

Yik San Chan
Hi Dian,

I changed the udf to:

```python
@udf(
input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j
```

But I still get the same error.

On Tue, May 18, 2021 at 5:47 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

The expected input types for add are DataTypes.INT, however, the schema of aiinfra.mysource is: a bigint and b bigint.

Regards,
Dian

2021年5月18日 下午5:38,Yik San Chan <[hidden email]> 写道:

Hi,

I have a PyFlink script that fails to use a simple UDF. The full script can be found below:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
DataTypes,
EnvironmentSettings,
SqlDialect,
StreamTableEnvironment,
)
from pyflink.table.udf import udf


@udf(
input_types=[DataTypes.INT(), DataTypes.INT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j


TRANSFORM = """
INSERT INTO aiinfra.mysink
SELECT ADD(a, b)
FROM aiinfra.mysource
"""

CREATE_CATALOG = """
CREATE CATALOG hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
)"""

USE_CATALOG = "USE CATALOG hive"


exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
stream_execution_environment=exec_env, environment_settings=env_settings
)

t_env.create_temporary_function("add", add)

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG)
t_env.execute_sql(USE_CATALOG)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_result = t_env.execute_sql(TRANSFORM)
```

However, when I submit the python file to my flink cluster, it throws exception:

```
[INFO] 2021-05-18 17:27:47.758  - [taskAppId=TASK-90019-86729-380519]:[152] -  -> Traceback (most recent call last):
 File "aiinfra/batch_example.py", line 50, in <module>
   t_result = t_env.execute_sql(TRANSFORM)
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 766, in execute_sql
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 8 to line 3, column 16: No match found for function signature ADD(<NUMERIC>, <NUMERIC>)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

Seems it has difficulties knowing the "add" function has already been registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I don't think it is a upper-or-lower case issue.

Also, if I replace "ADD(a, b)" with the simple "a + b", the script produces exactly what I need.

Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint.

Any help? Thanks!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: No match found for function signature XXX

Yik San Chan
With the help from Dian and friends, it turns out the root cause is:

When it `create_temporary_function`, it is in the default catalog. However, when it `execute_sql(TRANSFORM)`, it is in the "hive" catalog. A function defined as a temporary function in catalog "default" is not accessible from catalog "hive".

To solve the problem, simply replace `create_temporary_function` with `create_temporary_system_function` so that it is accessible from other catalogs as well.


Best,
Yik San

On Tue, May 18, 2021 at 6:43 PM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I changed the udf to:

```python
@udf(
input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j
```

But I still get the same error.

On Tue, May 18, 2021 at 5:47 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

The expected input types for add are DataTypes.INT, however, the schema of aiinfra.mysource is: a bigint and b bigint.

Regards,
Dian

2021年5月18日 下午5:38,Yik San Chan <[hidden email]> 写道:

Hi,

I have a PyFlink script that fails to use a simple UDF. The full script can be found below:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
DataTypes,
EnvironmentSettings,
SqlDialect,
StreamTableEnvironment,
)
from pyflink.table.udf import udf


@udf(
input_types=[DataTypes.INT(), DataTypes.INT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j


TRANSFORM = """
INSERT INTO aiinfra.mysink
SELECT ADD(a, b)
FROM aiinfra.mysource
"""

CREATE_CATALOG = """
CREATE CATALOG hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
)"""

USE_CATALOG = "USE CATALOG hive"


exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
stream_execution_environment=exec_env, environment_settings=env_settings
)

t_env.create_temporary_function("add", add)

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG)
t_env.execute_sql(USE_CATALOG)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_result = t_env.execute_sql(TRANSFORM)
```

However, when I submit the python file to my flink cluster, it throws exception:

```
[INFO] 2021-05-18 17:27:47.758  - [taskAppId=TASK-90019-86729-380519]:[152] -  -> Traceback (most recent call last):
 File "aiinfra/batch_example.py", line 50, in <module>
   t_result = t_env.execute_sql(TRANSFORM)
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 766, in execute_sql
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
 File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
 File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 8 to line 3, column 16: No match found for function signature ADD(<NUMERIC>, <NUMERIC>)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

Seems it has difficulties knowing the "add" function has already been registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I don't think it is a upper-or-lower case issue.

Also, if I replace "ADD(a, b)" with the simple "a + b", the script produces exactly what I need.

Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint.

Any help? Thanks!

Best,
Yik San