pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

刘亚坤
目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def drop_fields(message, *fields):
      import json
      message = json.loads(message)
      for field in fields:
            message.pop(field)
      return  json.dumps(message)


st_env \
      .form_path("source") \
      .select("drop_fields(message,'x')") \
      .insert_into("sink")

message 格式:
{“a”:"1","x","2"}

报错参数类型不匹配:
Actual:(java.lang.String, java.lang.String)
Expected:(org.apache.flink.table.dataformat.BinaryString)

新手入门,请多指教,感谢。
Reply | Threaded
Open this post in threaded view
|

Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

Dian Fu
The input types should be as following:

input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]

Regards,
Dian

在 2020年6月1日,上午10:49,刘亚坤 <[hidden email]> 写道:

目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def drop_fields(message, *fields):
      import json
      message = json.loads(message)
      for field in fields:
            message.pop(field)
      return  json.dumps(message)


st_env \
      .form_path("source") \
      .select("drop_fields(message,'x')") \
      .insert_into("sink")

message 格式:
{“a”:"1","x","2"}

报错参数类型不匹配:
Actual:(java.lang.String, java.lang.String)
Expected:(org.apache.flink.table.dataformat.BinaryString)

新手入门,请多指教,感谢。