pyFlink UDTF function registration

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

pyFlink UDTF function registration

Manas Kale
Hi,
I am trying to use a UserDefined Table Function to split up some data as follows:

from pyflink.table.udf import udtf
@udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield (f_name, f_value)

# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`data` STRING,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_temporary_table = f"""
CREATE TABLE {TEMPORARY_TABLE} (
`monitorId` STRING,
`featureName` STRING,
`featureData` DOUBLE,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
)
"""

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT monitorId, split(data), time_st
FROM {INPUT_TABLE}
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_temporary_table)
t_env.execute_sql(ddl_populate_temporary_table)

However, I get the following error :
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 23 to line 3, column 33: No match found for function signature split(<CHARACTER>)

I believe I am using the correct call to register the UDTF as per [1]. Am I missing something?

Thanks,
Manas

Reply | Threaded
Open this post in threaded view
|

Re: pyFlink UDTF function registration

Xingbo Huang
Hi Manas,
You need to join with the python udtf function. You can try the following sql:

ddl_populate_temporary_table = f"""
    INSERT INTO {TEMPORARY_TABLE}
    SELECT * FROM (
    SELECT monitorId, featureName, featureData, time_st
    FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureData)) t
"""

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月15日周三 下午7:31写道:
Hi,
I am trying to use a UserDefined Table Function to split up some data as follows:

from pyflink.table.udf import udtf
@udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield (f_name, f_value)

# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`data` STRING,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_temporary_table = f"""
CREATE TABLE {TEMPORARY_TABLE} (
`monitorId` STRING,
`featureName` STRING,
`featureData` DOUBLE,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
)
"""

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT monitorId, split(data), time_st
FROM {INPUT_TABLE}
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_temporary_table)
t_env.execute_sql(ddl_populate_temporary_table)

However, I get the following error :
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 23 to line 3, column 33: No match found for function signature split(<CHARACTER>)

I believe I am using the correct call to register the UDTF as per [1]. Am I missing something?

Thanks,
Manas

Reply | Threaded
Open this post in threaded view
|

Re: pyFlink UDTF function registration

Manas Kale
Hi Xingbo,
Thank you for the elaboration. Note that all of this is for a streaming job.
I used this code to create a SQL VIEW : 
f"""
CREATE VIEW TMP_TABLE AS
SELECT monitorId, featureName, featureData, time_st FROM (
SELECT monitorId, featureName, featureData, time_st
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureData)) t
"""
and then tried executing a Tumbling window on it:
f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, MAX(featureData), MIN(featureData), AVG(featureData), TUMBLE_START(time_st, INTERVAL '2' SECOND)
FROM TMP_TABLE
GROUP BY TUMBLE(time_st, INTERVAL '2' SECOND), monitorId, featureName
""")
But I am getting this error:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'data' not found in any table

I don't understand how it is expecting column "data" as I explicitly did not select that in the CREATE VIEW statement. 
Also, is it valid to use an SQL VIEW for a streaming job such as this?

Regards,
Manas


On Wed, Jul 15, 2020 at 5:42 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,
You need to join with the python udtf function. You can try the following sql:

ddl_populate_temporary_table = f"""
    INSERT INTO {TEMPORARY_TABLE}
    SELECT * FROM (
    SELECT monitorId, featureName, featureData, time_st
    FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureData)) t
"""

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月15日周三 下午7:31写道:
Hi,
I am trying to use a UserDefined Table Function to split up some data as follows:

from pyflink.table.udf import udtf
@udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield (f_name, f_value)

# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`data` STRING,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_temporary_table = f"""
CREATE TABLE {TEMPORARY_TABLE} (
`monitorId` STRING,
`featureName` STRING,
`featureData` DOUBLE,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
)
"""

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT monitorId, split(data), time_st
FROM {INPUT_TABLE}
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_temporary_table)
t_env.execute_sql(ddl_populate_temporary_table)

However, I get the following error :
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 23 to line 3, column 33: No match found for function signature split(<CHARACTER>)

I believe I am using the correct call to register the UDTF as per [1]. Am I missing something?

Thanks,
Manas