Re: can flink sql handle udf-generated timestamp field

Posted by Yu Yang on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/can-flink-sql-handle-udf-generated-timestamp-field-tp28127p28137.html

+flink-user

On Wed, Jun 5, 2019 at 9:58 AM Yu Yang <[hidden email]> wrote:
Thanks for the reply!  In flink-table-planner, TimeIndicatorTypeInfo is an internal class that cannot be referenced from application. I got "cannot find symbol" error when I tried to use it. I have also tried to use " SqlTimeTypeInfo.getInfoFor(Timestamp.class) " as return type for my udf type info. With that, I got the same  "Window can only be defined over a time attribute column" error as before. 

On Wed, Jun 5, 2019 at 4:41 AM Lee tinker <[hidden email]> wrote:
Hi Yu Yang:
When you want to use time on window, the type of time should be right according to flink.  We can see you return a Types.SQL_TIMESTAMP in your UDF. This type should be TimeIndicatorTypeInfo.PROCTIME_INDICATOR or  TimeIndicatorTypeInfo.ROWTIME_INDICATOR instead of Types.SQL_TIMESTAMP according to your time type(proctime or rowtime). You can try it again by using it.

Yu Yang <[hidden email]> 于2019年6月5日周三 下午2:57写道:
Hi, 

I am trying to use Flink SQL to do aggregation on a hopping window. In the data stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' to convert long to Timestamp type. 

  public static class TimestampModifier extends ScalarFunction {
    public Timestamp eval(long t) {
      return new Timestamp(t);
    }
    public TypeInformation<?> getResultType(Class<?>[] signature) {
      return Types.SQL_TIMESTAMP;
    }
  }

With the above UDF, I wrote the following query, and ran into  "ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column". 
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this experiment. 

my sql query: 

select  keyid, sum(value) 
from ( 
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value 
   from orders) 
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid 

flink exception: 

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards, 
-Yu