LIMIT and ORDER BY in hop window is not supported?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

LIMIT and ORDER BY in hop window is not supported?

徐涛
Hi,
I want a top n result on each hop window result, but some error throws out when I add the order by sentence or the limit sentence, so how do I implement such case ?
Thanks a lot.

SELECT
    trackId as id,track_title as description, count(*) as cnt
FROM
    play
WHERE
    appName='play.statistics.trace'
GROUP BY
    HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' MINUTE),trackId,track_title
ORDER BY
    cnt desc
LIMIT 10

FlinkLogicalSort(sort0=[$2], dir0=[DESC])
  FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
    FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
      FlinkLogicalNativeTableScan(table=[[play]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)

Best
Henry
Reply | Threaded
Open this post in threaded view
|

Re: LIMIT and ORDER BY in hop window is not supported?

Hequn Cheng
Hi Henry,

Currently, Order By is supported in Streaming&Batch while Limit is only supported in Batch. Another thing to be noted is, for Order by, the result of streaming queries must be primarily sorted on an ascending time attribute[1]. 




On Thu, Sep 27, 2018 at 9:05 PM 徐涛 <[hidden email]> wrote:
Hi,
I want a top n result on each hop window result, but some error throws out when I add the order by sentence or the limit sentence, so how do I implement such case ?
Thanks a lot.

SELECT
    trackId as id,track_title as description, count(*) as cnt
FROM
    play
WHERE
    appName='play.statistics.trace'
GROUP BY
    HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' MINUTE),trackId,track_title
ORDER BY
    cnt desc
LIMIT 10

FlinkLogicalSort(sort0=[$2], dir0=[DESC])
  FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
    FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
      FlinkLogicalNativeTableScan(table=[[play]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)

Best
Henry
Reply | Threaded
Open this post in threaded view
|

Re: LIMIT and ORDER BY in hop window is not supported?

徐涛
Hi Hequn,
If limit n is not supported in streaming, how to solve top n problem in stream scenario?

Best
Henry

在 2018年9月28日,上午12:03,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Currently, Order By is supported in Streaming&Batch while Limit is only supported in Batch. Another thing to be noted is, for Order by, the result of streaming queries must be primarily sorted on an ascending time attribute[1]. 




On Thu, Sep 27, 2018 at 9:05 PM 徐涛 <[hidden email]> wrote:
Hi,
I want a top n result on each hop window result, but some error throws out when I add the order by sentence or the limit sentence, so how do I implement such case ?
Thanks a lot.

SELECT
    trackId as id,track_title as description, count(*) as cnt
FROM
    play
WHERE
    appName='play.statistics.trace'
GROUP BY
    HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' MINUTE),trackId,track_title
ORDER BY
    cnt desc
LIMIT 10

FlinkLogicalSort(sort0=[$2], dir0=[DESC])
  FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
    FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
      FlinkLogicalNativeTableScan(table=[[play]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)

Best
Henry

Reply | Threaded
Open this post in threaded view
|

Re: LIMIT and ORDER BY in hop window is not supported?

Hequn Cheng
Hi,

You can implement TopN on SQL/Table-api or write a datastream job with ProcessFunction to solve the problem.

Best, Hequn

On Fri, Sep 28, 2018 at 9:38 AM 徐涛 <[hidden email]> wrote:
Hi Hequn,
If limit n is not supported in streaming, how to solve top n problem in stream scenario?

Best
Henry

在 2018年9月28日,上午12:03,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Currently, Order By is supported in Streaming&Batch while Limit is only supported in Batch. Another thing to be noted is, for Order by, the result of streaming queries must be primarily sorted on an ascending time attribute[1]. 




On Thu, Sep 27, 2018 at 9:05 PM 徐涛 <[hidden email]> wrote:
Hi,
I want a top n result on each hop window result, but some error throws out when I add the order by sentence or the limit sentence, so how do I implement such case ?
Thanks a lot.

SELECT
    trackId as id,track_title as description, count(*) as cnt
FROM
    play
WHERE
    appName='play.statistics.trace'
GROUP BY
    HOP(started_at_ts, INTERVAL '1' SECOND, INTERVAL '5' MINUTE),trackId,track_title
ORDER BY
    cnt desc
LIMIT 10

FlinkLogicalSort(sort0=[$2], dir0=[DESC])
  FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
    FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], started_at_ts=[$t4], trackId=[$t1], track_title=[$t2], $condition=[$t6])
      FlinkLogicalNativeTableScan(table=[[play]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)

Best
Henry