Question about Timestamp in Flink SQL

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about Timestamp in Flink SQL

wangsan
Hi all,

While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.

Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")

val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")
And here’s the result:


Best,
wangsan
Reply | Threaded
Open this post in threaded view
|

Re: Question about Timestamp in Flink SQL

Xingcan Cui
Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis() and the long value will be automatically wrapped to a Timestamp with the following statement:

`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally, the returned value should rely on the default TimeZone of your operating system.

Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan <[hidden email]> wrote:
Hi all,

While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.

Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")

val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")
And here’s the result:


Best,
wangsan

Reply | Threaded
Open this post in threaded view
|

Re: Question about Timestamp in Flink SQL

wangsan
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
I looked into the generated code, and I found the following code snippet:

```
result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract the offset?

Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui <[hidden email]> wrote:

Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis() and the long value will be automatically wrapped to a Timestamp with the following statement:

`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally, the returned value should rely on the default TimeZone of your operating system.

Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan <[hidden email]> wrote:
Hi all,

While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.

Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")

val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")
And here’s the result:

<PastedGraphic-1.png>

Best,
wangsan


Reply | Threaded
Open this post in threaded view
|

Re: Question about Timestamp in Flink SQL

Timo Walther
Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. All calculations happen on the UTC timestamp. This also guarantees that an input with Timestamp.valueOf("XXX") remains consistent when parsing and outputing it with toString().

Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
I looked into the generated code, and I found the following code snippet:

```
result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract the offset?

Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui <[hidden email]> wrote:

Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis() and the long value will be automatically wrapped to a Timestamp with the following statement:

`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally, the returned value should rely on the default TimeZone of your operating system.

Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan <[hidden email]> wrote:
Hi all,

While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.

Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")

val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")
And here’s the result:

<PastedGraphic-1.png>

Best,
wangsan



Reply | Threaded
Open this post in threaded view
|

Re: Question about Timestamp in Flink SQL

wangsan
Hi Timo,

What I am doing is extracting a timestamp field (may be string format as “2017-11-28 11:00:00” or a long value base on my current timezone) as Event time attribute. So In timestampAndWatermarkAssigner , for string format I should parse the data time string using GMT, and for long value I should add the offset as opposite to what internalToTimestamp did. But the Processing time attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



On 29 Nov 2017, at 4:43 PM, Timo Walther <[hidden email]> wrote:

Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. All calculations happen on the UTC timestamp. This also guarantees that an input with Timestamp.valueOf("XXX") remains consistent when parsing and outputing it with toString().

Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
I looked into the generated code, and I found the following code snippet:

```
result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract the offset?

Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui <[hidden email]> wrote:

Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis() and the long value will be automatically wrapped to a Timestamp with the following statement:

`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally, the returned value should rely on the default TimeZone of your operating system.

Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan <[hidden email]> wrote:
Hi all,

While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.

Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")

val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")
And here’s the result:

<PastedGraphic-1.png>

Best,
wangsan




Reply | Threaded
Open this post in threaded view
|

Re: Question about Timestamp in Flink SQL

Timo Walther
Hi Wangsan,

I opened an issue to document the behavior properly in the future (https://issues.apache.org/jira/browse/FLINK-8169). Basically, both your event-time and processing-time timestamps should be GMT. We plan to support offsets for windows in the future (https://issues.apache.org/jira/browse/FLINK-8168). Internally, the long values remain constant in GMT. Only the toString() output is timezone dependent. For now, I would suggest to implement either some user-defined scalar functions to implement your desired behavior or just subtract the offset (ts - INTERVAL '8' HOURS should work).

The timezone support must definitely improved in future versions of Flink SQL.

Regards,
Timo



Am 11/29/17 um 10:50 AM schrieb wangsan:
Hi Timo,

What I am doing is extracting a timestamp field (may be string format as “2017-11-28 11:00:00” or a long value base on my current timezone) as Event time attribute. So In timestampAndWatermarkAssigner , for string format I should parse the data time string using GMT, and for long value I should add the offset as opposite to what internalToTimestamp did. But the Processing time attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



On 29 Nov 2017, at 4:43 PM, Timo Walther <[hidden email]> wrote:

Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. All calculations happen on the UTC timestamp. This also guarantees that an input with Timestamp.valueOf("XXX") remains consistent when parsing and outputing it with toString().

Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
I looked into the generated code, and I found the following code snippet:

```
result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract the offset?

Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui <[hidden email]> wrote:

Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis() and the long value will be automatically wrapped to a Timestamp with the following statement:

`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally, the returned value should rely on the default TimeZone of your operating system.

Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan <[hidden email]> wrote:
Hi all,

While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.

Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")

val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")
And here’s the result:

<PastedGraphic-1.png>

Best,
wangsan