Hi, all I am trying to do something like this ``` tEnv .sqlQuery("SELECT rawEvent.id, collect(rawEvent.name) FROM rawEvent GROUP BY rawEvent.id") .toRetractStream[(Long, java.util.Map[String, java.lang.Integer])] ``` An exception is thrown when I ran the above code with the default planner setting in 1.10.1. I presume I am using the older planner. ``` Exception in thread "main" org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: GenericType<java.util.Map>; Actual: Multiset<String> at org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2(Conversions.scala:104) at org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2$adapted(Conversions.scala:98) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98) at org.apache.flink.table.planner.DataStreamConversions$.getConversionMapperWithChanges(DataStreamConversions.scala:184) at org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:90) at org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:413) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185) at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:273) at scala.collection.TraversableLike.map$(TraversableLike.scala:266) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:127) at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) at io.redacted.sub.package$.getMatchesWithParent(package.scala:244) at io.redacted.sub.package$.process(package.scala:156) at io.redacted.DataAggregator$.main(DataAggregator.scala:15) at io.redacted.DataAggregator.main(DataAggregator.scala) Process finished with exit code 1 ``` The result type of aggregation function collect is multiset. How do I convert it to a `java.util.Map[String, java.lang.Integer]`? Cheers, YI
|
Hi YI,
not all conversion might be supported in the `toRetractStream` method. Unfortunately, the rework of the type system is still in progress. I hope we can improve the user experience there quite soon. Have you tried to use `Row` instead? `toRetractStream[Row]` should work for all data types. A subsequent conversion MapFunction can then transform the data in your desired representation. Regards, Timo On 28.06.20 09:37, YI wrote: > Hi, all > > I am trying to do something like this > ``` > tEnv > .sqlQuery("SELECT rawEvent.id, collect(rawEvent.name) FROM rawEvent > GROUP BY rawEvent.id") > .toRetractStream[(Long, java.util.Map[String, java.lang.Integer])] > ``` > > An exception is thrown when I ran the above code with the default > planner setting in 1.10.1. I presume I am using the older planner. > > ``` > Exception in thread "main" org.apache.flink.table.api.TableException: > Result field does not match requested type. Requested: > GenericType<java.util.Map>; Actual: Multiset<String> > at > org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2(Conversions.scala:104) > at > org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2$adapted(Conversions.scala:98) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at > org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98) > at > org.apache.flink.table.planner.DataStreamConversions$.getConversionMapperWithChanges(DataStreamConversions.scala:184) > at > org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:90) > at > org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:413) > at > org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402) > at > org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185) > at > org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.map(TraversableLike.scala:273) > at scala.collection.TraversableLike.map$(TraversableLike.scala:266) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:127) > at > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > at io.redacted.sub.package$.getMatchesWithParent(package.scala:244) > at io.redacted.sub.package$.process(package.scala:156) > at io.redacted.DataAggregator$.main(DataAggregator.scala:15) > at io.redacted.DataAggregator.main(DataAggregator.scala) > > Process finished with exit code 1 > ``` > > The result type of aggregation function collect is multiset. How do I > convert it to a `java.util.Map[String, java.lang.Integer]`? > > Cheers, > YI |
Free forum by Nabble | Edit this page |