Hi, I am trying to use Flink SQL to do aggregation on a hopping window. In the data stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' to convert long to Timestamp type. public static class TimestampModifier extends ScalarFunction { public Timestamp eval(long t) { return new Timestamp(t); } public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.SQL_TIMESTAMP; } } With the above UDF, I wrote the following query, and ran into "ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column". Any suggestions on how to resolve this issue? I am using Flink 1.8 for this experiment. my sql query: select keyid, sum(value) from ( select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value from orders) group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid flink exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) Regards, -Yu |
Hi
@Yu Yang: Time-based operations such as windows in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer logical time attributes for indicating time and accessing corresponding timestamps in table programs.[1] This mean Window can only be defined over a time attribute column. You need define a rowtime in your source just like (UserActionTime is a long field, you don't need convert it to Timestamp):
See more information in below document: Best, JingsongLee
|
In reply to this post by Yu Yang
+flink-user On Wed, Jun 5, 2019 at 9:58 AM Yu Yang <[hidden email]> wrote:
|
In reply to this post by JingsongLee
Hi Jingsong, Thanks for the reply! The following is our code snippet for creating the log stream. Our messages are in thrift format. We use a customized serializer for serializing/deserializing messages ( see https://github.com/apache/flink/pull/8067 for the implementation) . Given that, how shall we define a time attribute column? We'd like to leverage customized serializer to figure out column names as much as possible. ThriftDeserializationSchema deserializationSchema = new ThriftDeserializationSchema(CustomerOrders.class, ThriftCodeGenerator.SCROOGE); FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, m10n05Properties); tableEnv.registerDataStream(“orders”, kafkaConsumer); Regards, -Yu On Wed, Jun 5, 2019 at 11:15 PM JingsongLee <[hidden email]> wrote:
|
Hi Yu, When you register a DataStream as a Table, you can create a new attribute that contains the event timestamp of the DataStream records. For that, you would need to assign timestamps and generate watermarks before registering the stream: FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, m10n05Properties); // create DataStream from Kafka consumer DataStream<CustomerOrders> orders = env.addSource(kafkaConsumer); // assign timestamps with a custom timestamp assigner & WM generator DataStream<CustomerOrders> ordersWithTS = orders.assignTimestampsAndWatermarks(new YourTimestampAssigner()); // register DataStream as Table with ts as timestamp which is automatically extracted (see [1] for how to map POJO fields and [2] for timestamps) tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ..., ts.rowtime"); Hope this helps, Fabian Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang <[hidden email]>:
|
Thank you Fabian! We will try the approach that you suggest. On Thu, Jun 6, 2019 at 1:03 AM Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |