Timestamp conversion problem in Flink Table/SQL

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamp conversion problem in Flink Table/SQL

jia yichao
Hi community,


Recently I have encountered a problem with time conversion in Flink Table/SQL . When the processed field contains a timestamp type, the code of the flink table codegen first converts the timestamp type to a long type, and then converts the long type to a timestamp type on output. 
In the code generated by codegen, 
 “public static long toLong (Timestamp v)” and 
“public static java.sql.Timestamp internalToTimestamp (long v)” 
 are used in the conversion.
The internal implementation of these two methods will add or subtract the time zone offset.
In some cases, the two methods do not appear in pairs which causes the conversion time to be incorrect, resulting in watermark timestamp metrics on the web ui is equal to the correct value plus time zone offset, and the output of the process time field is equal to the correct value minus the time zone offset.

Why the time conversion method in calcite (SqlFunctions.java)  add or subtract time zones?Why flink Table/SQL uses these time conversion methods instead of using timestamp.getTime() .


============calcite SqlFunctions.java==========
/** Converts the Java type used for UDF parameters of SQL TIMESTAMP type
 * ({@link java.sql.Timestamp}) to internal representation (long).
 *
 * <p>Converse of {@link #internalToTimestamp(long)}. */
public static long toLong(Timestamp v) {
  return toLong(v, LOCAL_TZ);
}

// mainly intended for java.sql.Timestamp but works for other dates also
public static long toLong(java.util.Date v, TimeZone timeZone) {
  final long time = v.getTime();
  return time + timeZone.getOffset(time);
}

/** Converts the internal representation of a SQL TIMESTAMP (long) to the Java
 * type used for UDF parameters ({@link java.sql.Timestamp}). */
public static java.sql.Timestamp internalToTimestamp(long v) {
  return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
}



Related issue:https://github.com/apache/flink/pull/7180



thanks
Jiayichao
Reply | Threaded
Open this post in threaded view
|

Re: Timestamp conversion problem in Flink Table/SQL

Hequn Cheng
Hi Jiayichao,

The two methods do not have to appear in pairs, so you can't use timestamp.getTime() directly. 
Currently, Flink doesn't support time zone configuration. The timestamp(time of type Timestamp) always means the time in UTC+0. So in the test of your pr[1], the output timestamp means a time in UTC+0, instead of a time in your timezone. You can verify it by changing your sql to:
String sqlQuery = "select proctime, LOCALTIMESTAMP from MyTable";

But you raised a good question and it is true that it would be better to support time zone configuration in Flink. For example, provide a global timezone configuration. However, it is not a one or two lines code change. We need to take all operators into consideration. And it is better to solve it once for all.

Best, Hequn



On Fri, Dec 28, 2018 at 3:15 PM jia yichao <[hidden email]> wrote:
Hi community,


Recently I have encountered a problem with time conversion in Flink Table/SQL . When the processed field contains a timestamp type, the code of the flink table codegen first converts the timestamp type to a long type, and then converts the long type to a timestamp type on output. 
In the code generated by codegen, 
 “public static long toLong (Timestamp v)” and 
“public static java.sql.Timestamp internalToTimestamp (long v)” 
 are used in the conversion.
The internal implementation of these two methods will add or subtract the time zone offset.
In some cases, the two methods do not appear in pairs which causes the conversion time to be incorrect, resulting in watermark timestamp metrics on the web ui is equal to the correct value plus time zone offset, and the output of the process time field is equal to the correct value minus the time zone offset.

Why the time conversion method in calcite (SqlFunctions.java)  add or subtract time zones?Why flink Table/SQL uses these time conversion methods instead of using timestamp.getTime() .


============calcite SqlFunctions.java==========
/** Converts the Java type used for UDF parameters of SQL TIMESTAMP type
 * ({@link java.sql.Timestamp}) to internal representation (long).
 *
 * <p>Converse of {@link #internalToTimestamp(long)}. */
public static long toLong(Timestamp v) {
  return toLong(v, LOCAL_TZ);
}

// mainly intended for java.sql.Timestamp but works for other dates also
public static long toLong(java.util.Date v, TimeZone timeZone) {
  final long time = v.getTime();
  return time + timeZone.getOffset(time);
}

/** Converts the internal representation of a SQL TIMESTAMP (long) to the Java
 * type used for UDF parameters ({@link java.sql.Timestamp}). */
public static java.sql.Timestamp internalToTimestamp(long v) {
  return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
}



Related issue:https://github.com/apache/flink/pull/7180



thanks
Jiayichao