|
Hi,
You can implement TopN on SQL/Table-api or write a datastream job with ProcessFunction to solve the problem.
Best, Hequn Hi Hequn, If limit n is not supported in streaming, how to solve top n problem in stream scenario?
Best Henry
Hi Henry,
Currently, Order By is supported in Streaming&Batch while Limit is only supported in Batch. Another thing to be noted is, for Order by, the result of streaming queries must be primarily sorted on an ascending time attribute[1].
Hi, I want a top n result on each hop window result, but some error throws out when I add the order by sentence or the limit sentence, so how do I implement such case ? Thanks a lot.
trackId as id,track_title as description, count(*) as cnt appName='play.statistics.trace' HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' MINUTE),trackId,track_title
FlinkLogicalSort(sort0=[$2], dir0=[DESC]) FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()]) FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6]) FlinkLogicalNativeTableScan(table=[[play]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892) at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344) at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786) at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723) at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
Best Henry
|