Timestamp type mismatch between Flink, Iceberg, and Avro

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamp type mismatch between Flink, Iceberg, and Avro

Xingcan Cui
Hi all,

Recently, I tried to use Flink to write some Avro data to Iceberg. However, the timestamp representations for these systems really confused me. Here are some facts:
  • Avro uses `java.time.Instant` for logical type `timestamp_ms`;
  • Flink takes `java.time.Instant` as table type `TIMESTAMP_WITH_LOCAL_TIME_ZONE`;
  • Iceberg takes Avro `timestamp_ms` as timestamp without timezone.
When I used Flink DataType TIMESTAMP for timestamp_ms of Avro, I got the following error "class java.time.Instant cannot be cast to class java.time.LocalDateTime".

If I change the Flink DataType to TIMESTAMP_WITH_LOCAL_TIME_ZONE, Iceberg complains "timestamptz cannot be promoted to timestamp".

Does anyone have any thoughts on this?

Thanks,
Xingcan
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp type mismatch between Flink, Iceberg, and Avro

Timo Walther
Hi Xingcan,

we had a couple of discussions around the timestamp topic in Flink and
have a clear picture nowadays. Some background:

https://docs.google.com/document/d/1gNRww9mZJcHvUDCXklzjFEQGpefsuR_akCDfWsdE35Q/edit#

So whenever an instant or epoch time is required, TIMESTAMP_LTZ is the
way to go. However, since you can also represent a TIMESTAMP as a long
value (this is also done internally), we can also support TIMESTAMP in
connectors.

So I would assume that the issues is on the connector side which is not
properly integrated into the SQL type system. It might be a bug.

Regards,
Timo



On 21.05.21 17:23, Xingcan Cui wrote:

> Hi all,
>
> Recently, I tried to use Flink to write some Avro data to Iceberg.
> However, the timestamp representations for these systems really confused
> me. Here are some facts:
>
>   * Avro uses `java.time.Instant` for logical type `timestamp_ms`;
>   * Flink takes `java.time.Instant` as table type
>     `TIMESTAMP_WITH_LOCAL_TIME_ZONE`;
>   * Iceberg takes Avro `timestamp_ms` as timestamp without timezone.
>
> When I used Flink DataType TIMESTAMP for timestamp_ms of Avro, I got the
> following error "*class java.time.Instant cannot be cast to class
> java.time.LocalDateTime*".
>
> If I change the Flink DataType to TIMESTAMP_WITH_LOCAL_TIME_ZONE,
> Iceberg complains "t*imestamptz cannot be promoted to timestamp".*
>
> Does anyone have any thoughts on this?
>
> Thanks,
> Xingcan

Reply | Threaded
Open this post in threaded view
|

Re: Timestamp type mismatch between Flink, Iceberg, and Avro

Xingcan Cui
Hi Timo,

Thanks for the reply! The document is really helpful. I can solve my current problem with some workarounds. Will keep an eye on this topic!

Best,
Xingcan

On Fri, May 21, 2021, 12:08 Timo Walther <[hidden email]> wrote:
Hi Xingcan,

we had a couple of discussions around the timestamp topic in Flink and
have a clear picture nowadays. Some background:

https://docs.google.com/document/d/1gNRww9mZJcHvUDCXklzjFEQGpefsuR_akCDfWsdE35Q/edit#

So whenever an instant or epoch time is required, TIMESTAMP_LTZ is the
way to go. However, since you can also represent a TIMESTAMP as a long
value (this is also done internally), we can also support TIMESTAMP in
connectors.

So I would assume that the issues is on the connector side which is not
properly integrated into the SQL type system. It might be a bug.

Regards,
Timo



On 21.05.21 17:23, Xingcan Cui wrote:
> Hi all,
>
> Recently, I tried to use Flink to write some Avro data to Iceberg.
> However, the timestamp representations for these systems really confused
> me. Here are some facts:
>
>   * Avro uses `java.time.Instant` for logical type `timestamp_ms`;
>   * Flink takes `java.time.Instant` as table type
>     `TIMESTAMP_WITH_LOCAL_TIME_ZONE`;
>   * Iceberg takes Avro `timestamp_ms` as timestamp without timezone.
>
> When I used Flink DataType TIMESTAMP for timestamp_ms of Avro, I got the
> following error "*class java.time.Instant cannot be cast to class
> java.time.LocalDateTime*".
>
> If I change the Flink DataType to TIMESTAMP_WITH_LOCAL_TIME_ZONE,
> Iceberg complains "t*imestamptz cannot be promoted to timestamp".*
>
> Does anyone have any thoughts on this?
>
> Thanks,
> Xingcan