(This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66756612/failed-to-unit-test-pyflink-udf)
I am using PyFlink and I want to unit test my UDF written in Python. To test the simple udf below: ```python # tasks/helloworld/udf.py from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(input_types=[DataTypes.INT(), DataTypes.INT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j ``` I create a test file that should fail: ```python from tasks.helloworld.udf import add def test_add(): assert add(1,1) == 3 ``` Sadly, it passes if I run `pytest`: ``` > pytest =========================================================================================== test session starts ============================================================================================ platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink collected 1 item tests/test_helloworld.py . [100%] ============================================================================================= warnings summary ============================================================================================= ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working from collections import ( ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291 /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working if not isinstance(input_types, collections.Iterable) \ -- Docs: https://docs.pytest.org/en/stable/warnings.html ====================================================================================== 1 passed, 6 warnings in 0.98s ======================================================================================= ``` However, the test will fail as expected if I remove the `@udf(input_types=[...], result_type=...)` annotation: ``` > pytest =========================================================================================== test session starts ============================================================================================ platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink collected 1 item tests/test_helloworld.py F [100%] ================================================================================================= FAILURES ================================================================================================= _________________________________________________________________________________________________ test_add _________________________________________________________________________________________________ def test_add(): > assert add(1,1) == 3 E assert 2 == 3 E + where 2 = add(1, 1) tests/test_helloworld.py:4: AssertionError ============================================================================================= warnings summary ============================================================================================= ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 ../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13 /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working from collections import ( -- Docs: https://docs.pytest.org/en/stable/warnings.html ========================================================================================= short test summary info ========================================================================================== FAILED tests/test_helloworld.py::test_add - assert 2 == 3 ====================================================================================== 1 failed, 5 warnings in 0.17s ======================================================================================= ``` The full example can be found https://github.com/YikSanChan/how-to-pytest-flink. Best, Yik San |
H Yik San, As the udf `add` is decorated with `@udf` decorator, it is no longer a simple Python function if you reference `add`. If you execute `print(type(add(1, 1)))`, you will see the output is something like "<class 'pyflink.table.expression.Expression'>". You could try the following code: assert add._func(1, 1) == 3 add._func returns the original Python function. Regards, Dian On Tue, Mar 23, 2021 at 10:56 AM Yik San Chan <[hidden email]> wrote:
|
Hi Dian, Thanks! It solves my problem. Best, Yik San On Tue, Mar 23, 2021 at 1:29 PM Dian Fu <[hidden email]> wrote:
|
Hi Dian, The ._func method seems to be internal only. Maybe we can add some public-facing method to make it more intuitive for use in unit test? What do you think? Best, Yik San On Tue, Mar 23, 2021 at 2:02 PM Yik San Chan <[hidden email]> wrote:
|
Hi Yik San, This field isn't expected to be exposed to users and so I'm not convinced that we should add such an interface/method in Flink. Regards, Dian On Tue, Mar 23, 2021 at 2:04 PM Yik San Chan <[hidden email]> wrote:
|
Hi Dian, However users do want to unit test their UDFs, as supported in https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-user-defined-functions Even though the examples are for Flink, I believe PyFlink should ideally be no difference. What do you think? Best, Yik San On Tue, Mar 23, 2021 at 2:19 PM Dian Fu <[hidden email]> wrote:
|
As I replied in previous email, it doesn’t block users to write tests for PyFlink UDFs. Users could use ._func to access the original Python function if they want.
Regards, Dian
|
Hi Dian, As you said, users can, but I got the impression that using ._func to access the original Python function is not recommended, therefore not documented. While in Flink, unit testing a Scala/Java UDF is clearly documented and encouraged. Do I misread something? Best, Yik San On Wed, Mar 24, 2021 at 10:21 AM Dian Fu <[hidden email]> wrote:
|
It’s a good advice. I have created ticket https://issues.apache.org/jira/browse/FLINK-21938 to track this.
|
Hi Dian, Thanks for your patience on all these asks! Best, Yik San On Wed, Mar 24, 2021 at 10:32 AM Dian Fu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |