The input types should be as following:
input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
Regards,
Dian
目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def drop_fields(message, *fields):
import json
message = json.loads(message)
for field in fields:
message.pop(field)
return json.dumps(message)
st_env \
.form_path("source") \
.select("drop_fields(message,'x')") \
.insert_into("sink")
message 格式:
{“a”:"1","x","2"}
报错参数类型不匹配:
Actual:(java.lang.String, java.lang.String)
Expected:(org.apache.flink.table.dataformat.BinaryString)
新手入门,请多指教,感谢。