Unknown call expression: avg(amount) when use distinct() in Flink,Thanks~!

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Unknown call expression: avg(amount) when use distinct() in Flink,Thanks~!

Appleyuchi
I'm testing
<a href="https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
the" _src="https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
the">https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
the  part
"Distinct aggregation on over window"
(Ctrl+f and search the whole above string in above link please)
test code distinctaggregation3.java <a href="https://paste.ubuntu.com/p/7HJ9W3hVVN/ POJO" _src="https://paste.ubuntu.com/p/7HJ9W3hVVN/ POJO">https://paste.ubuntu.com/p/7HJ9W3hVVN/ POJO needed : OrderStream.java <a href="https://paste.ubuntu.com/p/f8msWgtzft/ Exception" _src="https://paste.ubuntu.com/p/f8msWgtzft/ Exception">https://paste.ubuntu.com/p/f8msWgtzft/ Exception stack is: Exception in thread "main" java.lang.RuntimeException: Unknown call expression: avg(amount) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72) at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72) at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226) at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:275) at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:95) at java.util.Optional.map(Optional.java:215) at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:95) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97) at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72) at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122) at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:741) at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:132) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:547) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:548) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:156) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152) at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:149) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47) at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:165) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:298) at DistinctAggregation3.main(DistinctAggregation3.java:92)
the code is extended from the above part in above official link
Is the official document wrong?

Thanks for your help.




 

Reply | Threaded
Open this post in threaded view
|

Re: Unknown call expression: avg(amount) when use distinct() in Flink,Thanks~!

Timo Walther
Hi,

I'm assuming you are using Flink 1.12?

The exception indicates that something is definitely going wrong with
the translation from Table API to optimizer nodes. We refactored a lot
of this code in this region. I investogate the issue and come back to
you once I opended a ticket.

Thanks for reporting it.

Regards,
Timo

On 22.01.21 14:41, Appleyuchi wrote:

> I'm testing
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html 
> the  <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
> the>   part
> "Distinct aggregation on over window"
> (Ctrl+f and search the whole above string in above link please)
>
>
>
>
> test code
> distinctaggregation3.java
> https://paste.ubuntu.com/p/7HJ9W3hVVN/ POJO  <https://paste.ubuntu.com/p/7HJ9W3hVVN/
> POJO>  needed :
> OrderStream.java
> https://paste.ubuntu.com/p/f8msWgtzft/ Exception  <https://paste.ubuntu.com/p/f8msWgtzft/
>
>
> Exception>  stack is:
> Exception in thread "main" java.lang.RuntimeException: Unknown call expression: avg(amount)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
> at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
> at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)
> at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:275)
> at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:95)
> at java.util.Optional.map(Optional.java:215)
> at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:95)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
> at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
> at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
> at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:741)
> at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:132)
> at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:547)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:548)
> at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:156)
> at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152)
> at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:149)
> at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
> at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
> at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:165)
> at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)
> at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)
> at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)
> at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:298)
> at DistinctAggregation3.main(DistinctAggregation3.java:92)
>
> *the code is extended from the above part in above official link*
> *Is the official document wrong?*
>
> Thanks for your help.
>
>
>
>