Flink Table API and Date fields

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Table API and Date fields

Flavio Pompermaier
Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Type is not supported: Date
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long one?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and Date fields

JingsongLee
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in the code.
Can you use java.sql.Date? 

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月5日(星期五) 22:52
To:user <[hidden email]>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Type is not supported: Date
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long one?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and Date fields

Flavio Pompermaier
I think I could do it for this specific use case but isn't this a big limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee <[hidden email]> wrote:
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in the code.
Can you use java.sql.Date? 

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月5日(星期五) 22:52
To:user <[hidden email]>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Type is not supported: Date
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long one?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and Date fields

JingsongLee
Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and java.time.* in Java.

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't this a big limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee <[hidden email]> wrote:
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in the code.
Can you use java.sql.Date? 

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月5日(星期五) 22:52
To:user <[hidden email]>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Type is not supported: Date
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long one?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and Date fields

Flavio Pompermaier
Of course there are java.sql.* and java.time.* in Java but it's also true that most of the times the POJOs you read come from an external (Maven) lib and if such POJOs contain date fields you have to create a local version of that POJO having the java.util.Date fields replaced by a java.sql.Date version of them.
Moreover you also have to create a conversion function from the original POJO to the Flink-specific one source (and this is very annoying expecially because if the POJO gets modified you have to check that your conversion function is updated accordingly).

Summarising, it is possible to work around this limitation but it's very uncomfortable (IMHO)

On Mon, Jul 8, 2019 at 11:52 AM JingsongLee <[hidden email]> wrote:
Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and java.time.* in Java.

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't this a big limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee <[hidden email]> wrote:
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in the code.
Can you use java.sql.Date? 

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月5日(星期五) 22:52
To:user <[hidden email]>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Type is not supported: Date
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long one?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and Date fields

Timo Walther
Hi Flavio,

yes I agree. This check is a bit confusing. The initial reason for that was that sql.Time, sql.Date, and sql.Timestamp extend from util.Date as well. But handling it as a generic type as Jingson mentioned might be the better option in order to write custom UDFs to handle them.

Regards,
Timo

Am 08.07.19 um 12:04 schrieb Flavio Pompermaier:
Of course there are java.sql.* and java.time.* in Java but it's also true that most of the times the POJOs you read come from an external (Maven) lib and if such POJOs contain date fields you have to create a local version of that POJO having the java.util.Date fields replaced by a java.sql.Date version of them.
Moreover you also have to create a conversion function from the original POJO to the Flink-specific one source (and this is very annoying expecially because if the POJO gets modified you have to check that your conversion function is updated accordingly).

Summarising, it is possible to work around this limitation but it's very uncomfortable (IMHO)

On Mon, Jul 8, 2019 at 11:52 AM JingsongLee <[hidden email]> wrote:
Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and java.time.* in Java.

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't this a big limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee <[hidden email]> wrote:
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in the code.
Can you use java.sql.Date? 

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月5日(星期五) 22:52
To:user <[hidden email]>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Type is not supported: Date
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long one?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and Date fields

Rong Rong
Hi Flavio,

Yes I think the handling of the DateTime in Flink can be better when dealing with DATE TIME type of systems. 
There are several limitations Jingsong briefly mentioned about java.util.Date. Some of these limitations are even affecting correctness of the results (e.g. Gregorian vs Julian calendar). and java.sql.Date is broadly used currently in Flink. 

I think handling it as a completely different type, either as generic type or another extension of the basic type will definitely helpful here. One important reason is that Flink can prevent the usage of some sql.Date functions mistakenly applied on util.Date. 

--
Rong

On Mon, Jul 8, 2019 at 6:13 AM Timo Walther <[hidden email]> wrote:
Hi Flavio,

yes I agree. This check is a bit confusing. The initial reason for that was that sql.Time, sql.Date, and sql.Timestamp extend from util.Date as well. But handling it as a generic type as Jingson mentioned might be the better option in order to write custom UDFs to handle them.

Regards,
Timo

Am 08.07.19 um 12:04 schrieb Flavio Pompermaier:
Of course there are java.sql.* and java.time.* in Java but it's also true that most of the times the POJOs you read come from an external (Maven) lib and if such POJOs contain date fields you have to create a local version of that POJO having the java.util.Date fields replaced by a java.sql.Date version of them.
Moreover you also have to create a conversion function from the original POJO to the Flink-specific one source (and this is very annoying expecially because if the POJO gets modified you have to check that your conversion function is updated accordingly).

Summarising, it is possible to work around this limitation but it's very uncomfortable (IMHO)

On Mon, Jul 8, 2019 at 11:52 AM JingsongLee <[hidden email]> wrote:
Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and java.time.* in Java.

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't this a big limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio

On Mon, Jul 8, 2019 at 4:06 AM JingsongLee <[hidden email]> wrote:
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in the code.
Can you use java.sql.Date? 

Best, JingsongLee

------------------------------------------------------------------
From:Flavio Pompermaier <[hidden email]>
Send Time:2019年7月5日(星期五) 22:52
To:user <[hidden email]>
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Type is not supported: Date
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: Date
at org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long one?

Best,
Flavio