Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

liujiangang
      I am using two sequence windows in SQL as following:

SELECT TUMBLE_START(rowtime, interval '1' minute) AS windowStart, bitmapUnion(bmp) AS bmp
FROM (SELECT TUMBLE_ROWTIME(eventTime, interval '1' minute) AS rowtime, bitmap(id) AS bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), MOD(hashCode(id), 1024)
)
GROUP BY TUMBLE(rowtime, interval '1' minute)


      It is ok to use flink planner. When I switch to blink planner, it reports the following error. Can anyone help me? Thank you.

Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet. 
please re-check sql grammar. 
Note: Windowed GroupBy Aggregate should not follow anon-windowed GroupBy aggregation.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:134)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:82)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlan(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:296)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:287)
at com.kuaishou.flink.hdp.demos.statistics.sql.GroupingSetsUV.main(GroupingSetsUV.java:79)



Reply | Threaded
Open this post in threaded view
|

Re: Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

Benchao Li
Hi,

Did you set "fast-emit" for your query?
If yes, the exception is by-design. Because emit will change the output of windowed aggregate from append to retract.
There is an open issue about this[1].


刘建刚 <[hidden email]> 于2020年4月15日周三 下午7:07写道:
      I am using two sequence windows in SQL as following:

SELECT TUMBLE_START(rowtime, interval '1' minute) AS windowStart, bitmapUnion(bmp) AS bmp
FROM (SELECT TUMBLE_ROWTIME(eventTime, interval '1' minute) AS rowtime, bitmap(id) AS bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), MOD(hashCode(id), 1024)
)
GROUP BY TUMBLE(rowtime, interval '1' minute)


      It is ok to use flink planner. When I switch to blink planner, it reports the following error. Can anyone help me? Thank you.

Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet. 
please re-check sql grammar. 
Note: Windowed GroupBy Aggregate should not follow anon-windowed GroupBy aggregation.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:134)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:82)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlan(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:296)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:287)
at com.kuaishou.flink.hdp.demos.statistics.sql.GroupingSetsUV.main(GroupingSetsUV.java:79)





--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

liujiangang
No ,I do not use "fast-emit”. Another group by is combined with this SQL. I use “tableConfig.setIdleStateRetentionTime()” to control idled state. If I delete “tableConfig.setIdleStateRetentionTime()” in blink, the error disappears. How can I resolve it? Thank you.

2020年4月15日 下午8:11,Benchao Li [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Did you set "fast-emit" for your query?
If yes, the exception is by-design. Because emit will change the output of windowed aggregate from append to retract.
There is an open issue about this[1].


刘建刚 <<a href="x-msg://11/user/SendEmail.jtp?type=node&amp;node=34334&amp;i=0" target="_top" rel="nofollow" link="external" class="">[hidden email]> 于2020年4月15日周三 下午7:07写道:
      I am using two sequence windows in SQL as following:

SELECT TUMBLE_START(rowtime, interval '1' minute) AS windowStart, bitmapUnion(bmp) AS bmp
FROM (SELECT TUMBLE_ROWTIME(eventTime, interval '1' minute) AS rowtime, bitmap(id) AS bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), MOD(hashCode(id), 1024)
)
GROUP BY TUMBLE(rowtime, interval '1' minute)


      It is ok to use flink planner. When I switch to blink planner, it reports the following error. Can anyone help me? Thank you.

Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet. 
please re-check sql grammar. 
Note: Windowed GroupBy Aggregate should not follow anon-windowed GroupBy aggregation.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:134)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:82)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlan(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:296)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:287)
at com.kuaishou.flink.hdp.demos.statistics.sql.GroupingSetsUV.main(GroupingSetsUV.java:79)





--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: <a href="x-msg://11/user/SendEmail.jtp?type=node&amp;node=34334&amp;i=1" target="_top" rel="nofollow" link="external" class="">[hidden email]; <a href="x-msg://11/user/SendEmail.jtp?type=node&amp;node=34334&amp;i=2" target="_top" rel="nofollow" link="external" class="">[hidden email]



To start a new topic under Apache Flink User Mailing List archive., [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

Benchao Li
Hi,

In blink planner, if you set retention time, it means that you enabled late records handling in WindowOperator.
It also changes the output of WindowOperator from append to retract.

刘建刚 <[hidden email]> 于2020年4月16日周四 上午8:40写道:
No ,I do not use "fast-emit”. Another group by is combined with this SQL. I use “tableConfig.setIdleStateRetentionTime()” to control idled state. If I delete “tableConfig.setIdleStateRetentionTime()” in blink, the error disappears. How can I resolve it? Thank you.

2020年4月15日 下午8:11,Benchao Li [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Did you set "fast-emit" for your query?
If yes, the exception is by-design. Because emit will change the output of windowed aggregate from append to retract.
There is an open issue about this[1].


刘建刚 <[hidden email]> 于2020年4月15日周三 下午7:07写道:
      I am using two sequence windows in SQL as following:

SELECT TUMBLE_START(rowtime, interval '1' minute) AS windowStart, bitmapUnion(bmp) AS bmp
FROM (SELECT TUMBLE_ROWTIME(eventTime, interval '1' minute) AS rowtime, bitmap(id) AS bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), MOD(hashCode(id), 1024)
)
GROUP BY TUMBLE(rowtime, interval '1' minute)


      It is ok to use flink planner. When I switch to blink planner, it reports the following error. Can anyone help me? Thank you.

Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet. 
please re-check sql grammar. 
Note: Windowed GroupBy Aggregate should not follow anon-windowed GroupBy aggregation.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:134)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:82)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlan(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:296)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:287)
at com.kuaishou.flink.hdp.demos.statistics.sql.GroupingSetsUV.main(GroupingSetsUV.java:79)





--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]



To start a new topic under Apache Flink User Mailing List archive., [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet.

liujiangang
Thank you. I will use flink planner first and have a look at the detail code.

2020年4月16日 上午10:17,Benchao Li <[hidden email]> 写道:

Hi,

In blink planner, if you set retention time, it means that you enabled late records handling in WindowOperator.
It also changes the output of WindowOperator from append to retract.

刘建刚 <[hidden email]> 于2020年4月16日周四 上午8:40写道:
No ,I do not use "fast-emit”. Another group by is combined with this SQL. I use “tableConfig.setIdleStateRetentionTime()” to control idled state. If I delete “tableConfig.setIdleStateRetentionTime()” in blink, the error disappears. How can I resolve it? Thank you.

2020年4月15日 下午8:11,Benchao Li [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Did you set "fast-emit" for your query?
If yes, the exception is by-design. Because emit will change the output of windowed aggregate from append to retract.
There is an open issue about this[1].


刘建刚 <[hidden email]> 于2020年4月15日周三 下午7:07写道:
      I am using two sequence windows in SQL as following:

SELECT TUMBLE_START(rowtime, interval '1' minute) AS windowStart, bitmapUnion(bmp) AS bmp
FROM (SELECT TUMBLE_ROWTIME(eventTime, interval '1' minute) AS rowtime, bitmap(id) AS bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), MOD(hashCode(id), 1024)
)
GROUP BY TUMBLE(rowtime, interval '1' minute)


      It is ok to use flink planner. When I switch to blink planner, it reports the following error. Can anyone help me? Thank you.

Exception in thread "main" org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet. 
please re-check sql grammar. 
Note: Windowed GroupBy Aggregate should not follow anon-windowed GroupBy aggregation.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:134)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:82)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlanInternal(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExpand.translateToPlan(StreamExecExpand.scala:42)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:296)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:287)
at com.kuaishou.flink.hdp.demos.statistics.sql.GroupingSetsUV.main(GroupingSetsUV.java:79)





--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]



To start a new topic under Apache Flink User Mailing List archive., [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]