Pyflink sink rowtime field

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Pyflink sink rowtime field

Jesse Lord

I am trying to sink the rowtime field in pyflink 1.10. I get the following error

 

For the source schema I use

 

    .field("rowtime", DataTypes.TIMESTAMP(2))

        .rowtime(

            Rowtime()

            .timestamps_from_field("timestamp")

            .watermarks_periodic_ascending()

        )

 

To create the rowtime field and have tried variations on

 

    .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())

 

In the sink schema.

 

Trying all of the different types in DataTypes I get essentially the following error:

 

py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto.

: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`output` do not match.

Query result schema: [rowtime: LocalDateTime]

TableSink schema:    [rowtime: Timestamp]

 

 

I know that in Java there is org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding type in the python library. Can anyone help point me to the correct type for the schema?

 

Thanks,

Jesse

 

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Pyflink sink rowtime field

Xingbo Huang
Hi Jesse,
I think that the type of rowtime you declared on the source schema is DataTypes.Timestamp(), you also use DataTypes.Timestamp() on the sink schema

Best,
Xingbo

Jesse Lord <[hidden email]> 于2020年7月15日周三 下午11:41写道:

I am trying to sink the rowtime field in pyflink 1.10. I get the following error

 

For the source schema I use

 

    .field("rowtime", DataTypes.TIMESTAMP(2))

        .rowtime(

            Rowtime()

            .timestamps_from_field("timestamp")

            .watermarks_periodic_ascending()

        )

 

To create the rowtime field and have tried variations on

 

    .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())

 

In the sink schema.

 

Trying all of the different types in DataTypes I get essentially the following error:

 

py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto.

: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`output` do not match.

Query result schema: [rowtime: LocalDateTime]

TableSink schema:    [rowtime: Timestamp]

 

 

I know that in Java there is org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding type in the python library. Can anyone help point me to the correct type for the schema?

 

Thanks,

Jesse