Format for timestamp type in Flink SQL

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Format for timestamp type in Flink SQL

Youngwoo Kim (김영우)
Hi,

I'm trying to create a table using Flink SQL to query from a Kafka topic. Messages from Kafka look like following:

(snip)
"svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
"log_ymdt":"2020-08-15T02:01:33.968Z"
"snapshot_dt":"2020-08-13"
"network_type":"LTE"

I'd like to make the column as 'TIMESTAMP' type but it does not work. Logs from TM:

Caused by: java.time.format.DateTimeParseException: Text '2020-08-15T00:01:29.003Z' could not be parsed at index 10
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) ~[?:1.8.0_265]
    at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) ~[?:1.8.0_265]
......

And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP WITH TIME ZONE, ), the DDL does not work as well.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 5, column 29.
Was expecting:
    "LOCAL" ...

How can I make my date-time format as the timestamp type? I'm running Flink 1.11.1 and executing sql using FlinkSQL CLI.

Thanks,
Youngwoo
Reply | Threaded
Open this post in threaded view
|

Re: Format for timestamp type in Flink SQL

Benchao Li-2
Hi Youngwoo,

What version of Flink and Json Format are you using?
From 1.11, we introduced  `json.timestamp-format.standard` to declare the timestamp format.
You can try `timestamp with local zone` data type with `ISO-8601` timestamp format.

Youngwoo Kim (김영우) <[hidden email]> 于2020年8月15日周六 下午12:12写道:
Hi,

I'm trying to create a table using Flink SQL to query from a Kafka topic. Messages from Kafka look like following:

(snip)
"svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
"log_ymdt":"2020-08-15T02:01:33.968Z"
"snapshot_dt":"2020-08-13"
"network_type":"LTE"

I'd like to make the column as 'TIMESTAMP' type but it does not work. Logs from TM:

Caused by: java.time.format.DateTimeParseException: Text '2020-08-15T00:01:29.003Z' could not be parsed at index 10
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) ~[?:1.8.0_265]
    at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) ~[?:1.8.0_265]
......

And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP WITH TIME ZONE, ), the DDL does not work as well.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 5, column 29.
Was expecting:
    "LOCAL" ...

How can I make my date-time format as the timestamp type? I'm running Flink 1.11.1 and executing sql using FlinkSQL CLI.

Thanks,
Youngwoo


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Format for timestamp type in Flink SQL

Youngwoo Kim (김영우)
Hi Benchao,

I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL but it does not work with slightly different errors:

1. TIMESTAMP WITH LOCAL TIME ZONE

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)

Caused by: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion class: java.time.Instant) to type information. Only data types that originated from type information fully support a reverse conversion.

at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)

at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)

at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)

at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)

at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)

at org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)

at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)

at org.apache.flink.table.client.gateway.local.result.CollectStreamResult.<init>(CollectStreamResult.java:71)

at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:101)

at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:129)

at org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)

at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)

at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)

at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)

at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)

at java.util.Optional.ifPresent(Optional.java:159)

at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)

at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)


2. TIMESTAMP(3) WITH LOCAL TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3) WITH LOCAL TIME ZONE


3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 3, column 32.
Was expecting:
    "LOCAL" ...



It looks like the timestamp 'yyyy-MM-ddTHH:mm:ss.SSSZ' is not supported in both 'SQL' and 'ISO-8601' format standard.

Just curious, what is the default format for timestamp type with a time zone?


Thanks,

Youngwoo



On Sat, Aug 15, 2020 at 8:16 PM Benchao Li <[hidden email]> wrote:
Hi Youngwoo,

What version of Flink and Json Format are you using?
From 1.11, we introduced  `json.timestamp-format.standard` to declare the timestamp format.
You can try `timestamp with local zone` data type with `ISO-8601` timestamp format.

Youngwoo Kim (김영우) <[hidden email]> 于2020年8月15日周六 下午12:12写道:
Hi,

I'm trying to create a table using Flink SQL to query from a Kafka topic. Messages from Kafka look like following:

(snip)
"svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
"log_ymdt":"2020-08-15T02:01:33.968Z"
"snapshot_dt":"2020-08-13"
"network_type":"LTE"

I'd like to make the column as 'TIMESTAMP' type but it does not work. Logs from TM:

Caused by: java.time.format.DateTimeParseException: Text '2020-08-15T00:01:29.003Z' could not be parsed at index 10
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) ~[?:1.8.0_265]
    at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) ~[?:1.8.0_265]
......

And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP WITH TIME ZONE, ), the DDL does not work as well.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 5, column 29.
Was expecting:
    "LOCAL" ...

How can I make my date-time format as the timestamp type? I'm running Flink 1.11.1 and executing sql using FlinkSQL CLI.

Thanks,
Youngwoo

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Format for timestamp type in Flink SQL

godfrey he
Hi Youngwoo,

> 1. TIMESTAMP WITH LOCAL TIME ZONE
Currently, SQL client uses legacy types for the collect sink, that means `TIMESTAMP WITH LOCAL TIME ZONE` is not supported.
you can refer to [1] to find the supported types, and there is a pr [2] to fix this.

>2. TIMESTAMP(3) WITH LOCAL TIME ZONE
I do not reproduce the exception

> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE

Best,
Godfrey

Youngwoo Kim (김영우) <[hidden email]> 于2020年8月16日周日 上午1:23写道:
Hi Benchao,

I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL but it does not work with slightly different errors:

1. TIMESTAMP WITH LOCAL TIME ZONE

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)

Caused by: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion class: java.time.Instant) to type information. Only data types that originated from type information fully support a reverse conversion.

at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)

at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)

at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)

at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)

at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)

at org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)

at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)

at org.apache.flink.table.client.gateway.local.result.CollectStreamResult.<init>(CollectStreamResult.java:71)

at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:101)

at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:129)

at org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)

at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)

at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)

at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)

at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)

at java.util.Optional.ifPresent(Optional.java:159)

at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)

at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)


2. TIMESTAMP(3) WITH LOCAL TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3) WITH LOCAL TIME ZONE


3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 3, column 32.
Was expecting:
    "LOCAL" ...



It looks like the timestamp 'yyyy-MM-ddTHH:mm:ss.SSSZ' is not supported in both 'SQL' and 'ISO-8601' format standard.

Just curious, what is the default format for timestamp type with a time zone?


Thanks,

Youngwoo



On Sat, Aug 15, 2020 at 8:16 PM Benchao Li <[hidden email]> wrote:
Hi Youngwoo,

What version of Flink and Json Format are you using?
From 1.11, we introduced  `json.timestamp-format.standard` to declare the timestamp format.
You can try `timestamp with local zone` data type with `ISO-8601` timestamp format.

Youngwoo Kim (김영우) <[hidden email]> 于2020年8月15日周六 下午12:12写道:
Hi,

I'm trying to create a table using Flink SQL to query from a Kafka topic. Messages from Kafka look like following:

(snip)
"svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
"log_ymdt":"2020-08-15T02:01:33.968Z"
"snapshot_dt":"2020-08-13"
"network_type":"LTE"

I'd like to make the column as 'TIMESTAMP' type but it does not work. Logs from TM:

Caused by: java.time.format.DateTimeParseException: Text '2020-08-15T00:01:29.003Z' could not be parsed at index 10
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) ~[?:1.8.0_265]
    at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) ~[?:1.8.0_265]
......

And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP WITH TIME ZONE, ), the DDL does not work as well.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 5, column 29.
Was expecting:
    "LOCAL" ...

How can I make my date-time format as the timestamp type? I'm running Flink 1.11.1 and executing sql using FlinkSQL CLI.

Thanks,
Youngwoo

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Format for timestamp type in Flink SQL

Youngwoo Kim (김영우)
Thanks Godfrey for the detailed explanation.

Youngwoo

2020년 8월 19일 (수) 오후 12:47, godfrey he <[hidden email]>님이 작성:
Hi Youngwoo,

> 1. TIMESTAMP WITH LOCAL TIME ZONE
Currently, SQL client uses legacy types for the collect sink, that means `TIMESTAMP WITH LOCAL TIME ZONE` is not supported.
you can refer to [1] to find the supported types, and there is a pr [2] to fix this.

>2. TIMESTAMP(3) WITH LOCAL TIME ZONE
I do not reproduce the exception

> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE

Best,
Godfrey

Youngwoo Kim (김영우) <[hidden email]> 于2020年8月16日周日 上午1:23写道:
Hi Benchao,

I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL but it does not work with slightly different errors:

1. TIMESTAMP WITH LOCAL TIME ZONE













Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.



at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)



Caused by: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion class: java.time.Instant) to type information. Only data types that originated from type information fully support a reverse conversion.



at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)



at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)



at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)



at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)



at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)



at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)



at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)



at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)



at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)



at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)



at org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)



at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)



at org.apache.flink.table.client.gateway.local.result.CollectStreamResult.<init>(CollectStreamResult.java:71)



at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:101)



at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:129)



at org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)



at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)



at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)



at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)



at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)



at java.util.Optional.ifPresent(Optional.java:159)



at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)



at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)



at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)



at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)


2. TIMESTAMP(3) WITH LOCAL TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3) WITH LOCAL TIME ZONE


3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 3, column 32.
Was expecting:
    "LOCAL" ...



























It looks like the timestamp 'yyyy-MM-ddTHH:mm:ss.SSSZ' is not supported in both 'SQL' and 'ISO-8601' format standard.

Just curious, what is the default format for timestamp type with a time zone?


Thanks,

Youngwoo







































On Sat, Aug 15, 2020 at 8:16 PM Benchao Li <[hidden email]> wrote:
Hi Youngwoo,

What version of Flink and Json Format are you using?
From 1.11, we introduced  `json.timestamp-format.standard` to declare the timestamp format.
You can try `timestamp with local zone` data type with `ISO-8601` timestamp format.

Youngwoo Kim (김영우) <[hidden email]> 于2020年8月15日周六 下午12:12写道:
Hi,

I'm trying to create a table using Flink SQL to query from a Kafka topic. Messages from Kafka look like following:

(snip)
"svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
"log_ymdt":"2020-08-15T02:01:33.968Z"
"snapshot_dt":"2020-08-13"
"network_type":"LTE"

I'd like to make the column as 'TIMESTAMP' type but it does not work. Logs from TM:

Caused by: java.time.format.DateTimeParseException: Text '2020-08-15T00:01:29.003Z' could not be parsed at index 10
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) ~[?:1.8.0_265]
    at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) ~[?:1.8.0_265]
......

And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP WITH TIME ZONE, ), the DDL does not work as well.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 5, column 29.
Was expecting:
    "LOCAL" ...

How can I make my date-time format as the timestamp type? I'm running Flink 1.11.1 and executing sql using FlinkSQL CLI.

Thanks,
Youngwoo

Best,
Benchao Li