Pyflink UDF with ARRAY as input

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Pyflink UDF with ARRAY as input

Barth, Torben

Dear List,

 

I have  a table with the following structure

 

my_table

-- Key: String

-- List_element: ARRAY<ROW<`integer_element` INT, `string_element` STRING >>

 

I want to define a udf to extract information of the “list_element”. I do not manage to access the information of the array in the udf. I try something like:

 

@udf(result_type=DataTypes.STRING())

def get_string_element(my_list):

       my_string = ‘xxx’

        for element in my_list:

            if element.integer_element  == 2:

                my_string = element. string_element

        return my_string

 

 

table_env.create_temporary_function("get_string_element", get_string_element)

# use the function in Python Table API

my_table.select("get_string_element(List_element)")

 

Unfortunately, I cannot get it work. Does anybody have an idea how the correct way to extract the information is?

 

Any comments or ideas are very welcome.

 

Thanks

Torben


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink UDF with ARRAY as input

Xingbo Huang
Hi Torben,

It is indeed a bug, and I have created a JIRA[1]. The work around solution is to use the index to solve (written in release-1.12):

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env,
                                          environment_settings=EnvironmentSettings.new_instance()
                                          .in_streaming_mode().use_blink_planner().build())

    @udf(result_type=DataTypes.STRING())
    def get_string_element(my_list):
        my_string = 'xxx'
        for element in my_list:
            if element[0] == 2:
                my_string = element[1]
        return my_string

    t = t_env.from_elements(
        [("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2", [Row(2, "python")])],
        DataTypes.ROW(
            [DataTypes.FIELD("Key", DataTypes.STRING()),
             DataTypes.FIELD("List_element",
                             DataTypes.ARRAY(DataTypes.ROW(
                                 [DataTypes.FIELD("integer_element", DataTypes.INT()),
                                  DataTypes.FIELD("string_element", DataTypes.STRING())])))]))
    print(t.select(get_string_element(t.List_element)).to_pandas())



[1] https://issues.apache.org/jira/browse/FLINK-20666

Best,
Xingbo

Barth, Torben <[hidden email]> 于2020年12月18日周五 上午2:46写道:

Dear List,

 

I have  a table with the following structure

 

my_table

-- Key: String

-- List_element: ARRAY<ROW<`integer_element` INT, `string_element` STRING >>

 

I want to define a udf to extract information of the “list_element”. I do not manage to access the information of the array in the udf. I try something like:

 

@udf(result_type=DataTypes.STRING())

def get_string_element(my_list):

       my_string = ‘xxx’

        for element in my_list:

            if element.integer_element  == 2:

                my_string = element. string_element

        return my_string

 

 

table_env.create_temporary_function("get_string_element", get_string_element)

# use the function in Python Table API

my_table.select("get_string_element(List_element)")

 

Unfortunately, I cannot get it work. Does anybody have an idea how the correct way to extract the information is?

 

Any comments or ideas are very welcome.

 

Thanks

Torben


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang