PyFlink DDL UDTF join error

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink DDL UDTF join error

Manas Kale
Hi,
Using pyFlink DDL, I am trying to:
  1. Consume a Kafka JSON stream. This has messages with aggregate data, example:  "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
  2. I am splitting field "data" so that I can process its values individually. For that, I have defined a UDTF.
  3. I store the UDTF output in a temporary view. (Meaning each output of the UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
  4. I use the values in this temporary view to calculate some aggregation metrics.
I am getting an SQL error for  step 4.
Code:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes, Row
from pyflink.table.udf import udtf
from json import loads
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())


@udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(),DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield f_name, f_value

# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
# ... string constants....

# Init Kafka input table
t_env.execute_sql(f"""
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
data STRING,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# 10 sec summary table
t_env.execute_sql(f"""
CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
monitorId STRING,
featureName STRING,
maxFv DOUBLE,
minFv DOUBLE,
avgFv DOUBLE,
windowStart TIMESTAMP(3),
WATERMARK FOR windowStart AS windowStart
) WITH (
'connector' = 'kafka',
'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# Join with UDTF
t_env.execute_sql(f"""
CREATE VIEW tmp_view AS
SELECT * FROM (
SELECT monitorId, T.featureName, T.featureValue, time_str
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureValue)
)
""")

# Create 10 second view <--------------------- this causes the error
t_env.execute_sql(f"""
INSERT INTO {TEN_SEC_OUTPUT_TABLE}
SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND)
FROM tmp_view
GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
""")

The last SQL statement where I calculate metrics causes the error. The error message is :
Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", line 97, in <module>
    """)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 543, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 15 to line 4, column 20: Column 'data' not found in any table

I don't understand why Flink wants a "data" column. I discard the "data" column in the temporary view, and it certainly does not exist in the
TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial INPUT_TABLE which is not relevant for the erroneous SQL statement!
Clearly I missed understanding something. Have I missed something when creating the temporary view?  



Reply | Threaded
Open this post in threaded view
|

Re: PyFlink DDL UDTF join error

Wei Zhong
Hi Manas,

It seems like a bug. You can try to replace the udtf sql call with such code as a workaround currently:

t_env.register_table("tmp_view", t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, featureValue)"))

This works for me. I’ll try to find out what caused this exception.

Best,
Wei

在 2020年7月28日,18:33,Manas Kale <[hidden email]> 写道:

Hi,
Using pyFlink DDL, I am trying to:
  1. Consume a Kafka JSON stream. This has messages with aggregate data, example:  "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
  2. I am splitting field "data" so that I can process its values individually. For that, I have defined a UDTF.
  3. I store the UDTF output in a temporary view. (Meaning each output of the UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
  4. I use the values in this temporary view to calculate some aggregation metrics.
I am getting an SQL error for  step 4.
Code:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes, Row
from pyflink.table.udf import udtf
from json import loads
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())


@udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(),DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield f_name, f_value

# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
# ... string constants....

# Init Kafka input table
t_env.execute_sql(f"""
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
data STRING,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# 10 sec summary table
t_env.execute_sql(f"""
CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
monitorId STRING,
featureName STRING,
maxFv DOUBLE,
minFv DOUBLE,
avgFv DOUBLE,
windowStart TIMESTAMP(3),
WATERMARK FOR windowStart AS windowStart
) WITH (
'connector' = 'kafka',
'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# Join with UDTF
t_env.execute_sql(f"""
CREATE VIEW tmp_view AS
SELECT * FROM (
SELECT monitorId, T.featureName, T.featureValue, time_str
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureValue)
)
""")

# Create 10 second view <--------------------- this causes the error
t_env.execute_sql(f"""
INSERT INTO {TEN_SEC_OUTPUT_TABLE}
SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND)
FROM tmp_view
GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
""")

The last SQL statement where I calculate metrics causes the error. The error message is :
Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", line 97, in <module>
    """)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 543, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 15 to line 4, column 20: Column 'data' not found in any table

I don't understand why Flink wants a "data" column. I discard the "data" column in the temporary view, and it certainly does not exist in the
TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial INPUT_TABLE which is not relevant for the erroneous SQL statement!
Clearly I missed understanding something. Have I missed something when creating the temporary view?  




Reply | Threaded
Open this post in threaded view
|

Re: PyFlink DDL UDTF join error

Wei Zhong
Hi Manas,

It seems a bug of the create view operation. I have created a JIRA for it: https://issues.apache.org/jira/browse/FLINK-18750

Before repairing, please do not use create view operation for udtf call. 

Best,
Wei

在 2020年7月28日,21:19,Wei Zhong <[hidden email]> 写道:

Hi Manas,

It seems like a bug. You can try to replace the udtf sql call with such code as a workaround currently:

t_env.register_table("tmp_view", t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, featureValue)"))

This works for me. I’ll try to find out what caused this exception.

Best,
Wei

在 2020年7月28日,18:33,Manas Kale <[hidden email]> 写道:

Hi,
Using pyFlink DDL, I am trying to:
  1. Consume a Kafka JSON stream. This has messages with aggregate data, example:  "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
  2. I am splitting field "data" so that I can process its values individually. For that, I have defined a UDTF.
  3. I store the UDTF output in a temporary view. (Meaning each output of the UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
  4. I use the values in this temporary view to calculate some aggregation metrics.
I am getting an SQL error for  step 4.
Code:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes, Row
from pyflink.table.udf import udtf
from json import loads
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())


@udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(),DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield f_name, f_value

# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
# ... string constants....

# Init Kafka input table
t_env.execute_sql(f"""
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
data STRING,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# 10 sec summary table
t_env.execute_sql(f"""
CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
monitorId STRING,
featureName STRING,
maxFv DOUBLE,
minFv DOUBLE,
avgFv DOUBLE,
windowStart TIMESTAMP(3),
WATERMARK FOR windowStart AS windowStart
) WITH (
'connector' = 'kafka',
'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# Join with UDTF
t_env.execute_sql(f"""
CREATE VIEW tmp_view AS
SELECT * FROM (
SELECT monitorId, T.featureName, T.featureValue, time_str
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureValue)
)
""")

# Create 10 second view <--------------------- this causes the error
t_env.execute_sql(f"""
INSERT INTO {TEN_SEC_OUTPUT_TABLE}
SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND)
FROM tmp_view
GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
""")

The last SQL statement where I calculate metrics causes the error. The error message is :
Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", line 97, in <module>
    """)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 543, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 15 to line 4, column 20: Column 'data' not found in any table

I don't understand why Flink wants a "data" column. I discard the "data" column in the temporary view, and it certainly does not exist in the
TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial INPUT_TABLE which is not relevant for the erroneous SQL statement!
Clearly I missed understanding something. Have I missed something when creating the temporary view?  





Reply | Threaded
Open this post in threaded view
|

Re: PyFlink DDL UDTF join error

Manas Kale
Hi Wei,
Thank you for the clarification and workaround.

Regards,
Manas

On Wed, Jul 29, 2020 at 12:55 PM Wei Zhong <[hidden email]> wrote:
Hi Manas,

It seems a bug of the create view operation. I have created a JIRA for it: https://issues.apache.org/jira/browse/FLINK-18750

Before repairing, please do not use create view operation for udtf call. 

Best,
Wei

在 2020年7月28日,21:19,Wei Zhong <[hidden email]> 写道:

Hi Manas,

It seems like a bug. You can try to replace the udtf sql call with such code as a workaround currently:

t_env.register_table("tmp_view", t_env.from_path(f"{INPUT_TABLE}").join_lateral("split(data) as (featureName, featureValue)"))

This works for me. I’ll try to find out what caused this exception.

Best,
Wei

在 2020年7月28日,18:33,Manas Kale <[hidden email]> 写道:

Hi,
Using pyFlink DDL, I am trying to:
  1. Consume a Kafka JSON stream. This has messages with aggregate data, example:  "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}"
  2. I am splitting field "data" so that I can process its values individually. For that, I have defined a UDTF.
  3. I store the UDTF output in a temporary view. (Meaning each output of the UDTF will contain "0001" 105.0, "0002" 1.21 etc...)
  4. I use the values in this temporary view to calculate some aggregation metrics.
I am getting an SQL error for  step 4.
Code:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes, Row
from pyflink.table.udf import udtf
from json import loads
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())


@udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(),DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield f_name, f_value

# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
# ... string constants....

# Init Kafka input table
t_env.execute_sql(f"""
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
data STRING,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# 10 sec summary table
t_env.execute_sql(f"""
CREATE TABLE {TEN_SEC_OUTPUT_TABLE} (
monitorId STRING,
featureName STRING,
maxFv DOUBLE,
minFv DOUBLE,
avgFv DOUBLE,
windowStart TIMESTAMP(3),
WATERMARK FOR windowStart AS windowStart
) WITH (
'connector' = 'kafka',
'topic' = '{TEN_SEC_OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
""")

# Join with UDTF
t_env.execute_sql(f"""
CREATE VIEW tmp_view AS
SELECT * FROM (
SELECT monitorId, T.featureName, T.featureValue, time_str
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName, featureValue)
)
""")

# Create 10 second view <--------------------- this causes the error
t_env.execute_sql(f"""
INSERT INTO {TEN_SEC_OUTPUT_TABLE}
SELECT monitorId, featureName, MAX(featureValue), MIN(featureValue), AVG(featureValue), TUMBLE_START(time_str, INTERVAL '10' SECOND)
FROM tmp_view
GROUP BY TUMBLE(time_str, INTERVAL '10' SECOND), monitorId, featureName
""")

The last SQL statement where I calculate metrics causes the error. The error message is :
Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/aggregate_streaming_job.py", line 97, in <module>
    """)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 543, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 15 to line 4, column 20: Column 'data' not found in any table

I don't understand why Flink wants a "data" column. I discard the "data" column in the temporary view, and it certainly does not exist in the
TEN_SECOND_OUTPUT_TABLE. The only place it exists is in the initial INPUT_TABLE which is not relevant for the erroneous SQL statement!
Clearly I missed understanding something. Have I missed something when creating the temporary view?