Dear List, I have a table with the following structure my_table -- Key: String -- List_element: ARRAY<ROW<`integer_element` INT, `string_element` STRING >>
I want to define a udf to extract information of the “list_element”. I do not manage to access the information of the array in the udf. I try something like: @udf(result_type=DataTypes.STRING()) def get_string_element(my_list): my_string = ‘xxx’ for element in my_list: if element.integer_element == 2: my_string = element. string_element return my_string
table_env.create_temporary_function("get_string_element",
get_string_element)
# use the function in Python Table API
my_table.select("get_string_element(List_element)")
Unfortunately, I cannot get it work. Does anybody have an idea how the correct way to extract the information is? Any comments or ideas are very welcome. Thanks Torben Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang |
Hi Torben, It is indeed a bug, and I have created a JIRA[1]. The work around solution is to use the index to solve (written in release-1.12): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance() .in_streaming_mode().use_blink_planner().build()) @udf(result_type=DataTypes.STRING()) def get_string_element(my_list): my_string = 'xxx' for element in my_list: if element[0] == 2: my_string = element[1] return my_string t = t_env.from_elements( [("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2", [Row(2, "python")])], DataTypes.ROW( [DataTypes.FIELD("Key", DataTypes.STRING()), DataTypes.FIELD("List_element", DataTypes.ARRAY(DataTypes.ROW( [DataTypes.FIELD("integer_element", DataTypes.INT()), DataTypes.FIELD("string_element", DataTypes.STRING())])))])) print(t.select(get_string_element(t.List_element)).to_pandas()) [1] https://issues.apache.org/jira/browse/FLINK-20666 Best, Xingbo Barth, Torben <[hidden email]> 于2020年12月18日周五 上午2:46写道:
|
Free forum by Nabble | Edit this page |