I am trying to sink the rowtime field in pyflink 1.10. I get the following error For the source schema I use .field("rowtime", DataTypes.TIMESTAMP(2)) .rowtime( Rowtime() .timestamps_from_field("timestamp") .watermarks_periodic_ascending() ) To create the rowtime field and have tried variations on .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) In the sink schema. Trying all of the different types in DataTypes I get essentially the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto. : org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`output` do not match. Query result schema: [rowtime: LocalDateTime] TableSink schema: [rowtime: Timestamp] I know that in Java there is org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding type in the python library.
Can anyone help point me to the correct type for the schema? Thanks, Jesse |
Hi Jesse, I think that the type of rowtime you declared on the source schema is DataTypes.Timestamp(), you also use DataTypes.Timestamp() on the sink schema Best, Xingbo Jesse Lord <[hidden email]> 于2020年7月15日周三 下午11:41写道:
|
Free forum by Nabble | Edit this page |