Hi All, I see strange behavior of UDAF functions: Let`s say we have a simple table: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); Table table = t.fromValues(DataTypes.ROW( As example we will use build-in function with a new name: t.createTemporaryFunction("max_value", new MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction()); Using Table API we can write: t.createTemporaryView("B", table and get: org.apache.flink.table.api.TableException: Aggregate functions are not updated to the new type system yet. Using SQL API we can write: t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A group by symbol")); and get: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 23: No match found for function signature max_value(<NUMERIC>) Calling build-in max function instead of provided alias will produce correct results. In addition, non-retract implementation of max function (MaxAggFunction.DoubleMaxAggFunction) would produce: org.apache.flink.table.api.ValidationException: Could not register temporary catalog function 'default_catalog.default_database.max_value' due to implementation errors. Cause DoubleMaxAggFunction is not serializable. Am I missing something? |
Hi Dmytro,
aggregate functions will support the new type system in Flink 1.12. Until then, they cannot be used with the new `call()` syntax as anonymous functions. In order to use the old type system, you need to register the function explicilty using SQL `CREATE FUNCTION a AS 'myFunc'` and then use them in `call("myFunc", ...)`. The mentioned "No match found for function signature fun(<NUMERIC>)" was a bug that got fixed in 1.11.1: https://issues.apache.org/jira/browse/FLINK-18520 This bug only exists for catalog functions, not temporary system functions. Regards, Timo On 27.07.20 16:35, Dmytro Dragan wrote: > Hi All, > > I see strange behavior of UDAF functions: > > Let`s say we have a simple table: > > EnvironmentSettings settings = > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > TableEnvironment t = TableEnvironment./create/(settings); > > Table table = t.fromValues(DataTypes./ROW/( > DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()), > DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull()) > ), > /row/(1.0, "S"), /row/(2.0, "S")); > t.createTemporaryView("A", table); > > As example we will use build-in function with a new name: > > t.createTemporaryFunction("max_value", new > MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction()); > > Using Table API we can write: > > t.createTemporaryView("B", table > .groupBy(/$/("symbol")) > .select(/$/("symbol"),/call/("max_value", /$/("price"))) > ); > > and get: > > org.apache.flink.table.api.TableException: Aggregate functions are not > updated to the new type system yet. > > Using SQL API we can write: > > t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A > group by symbol")); > > and get: > > org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 1, column 8 to line 1, column 23: No match found for function > signature max_value(<NUMERIC>) > > Calling build-in max function instead of provided alias will produce > correct results. > > In addition, non-retract implementation of max function > (MaxAggFunction.DoubleMaxAggFunction) would produce: > > org.apache.flink.table.api.ValidationException: Could not register > temporary catalog function 'default_catalog.default_database.max_value' > due to implementation errors. > > Cause DoubleMaxAggFunction is not serializable. > > Am I missing something? > |
Hi Timo,
I have switched to 1.11.1. Create function using "create function ..." fails with magic: Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:96) at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:46) at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536) at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:63) at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:709) at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:714) at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:401) at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1392) at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1356) at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383) at SortAggregateWithKeys$79.processElement(Unknown Source) "No match found for function signature fun(<NUMERIC>)" still exist. Mentioned bug was about TableFunction, so maybe it is something different, but related. I have created a small github project with both cases: https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java I would appreciate if you could take a look. On 27/07/2020, 16:49, "Timo Walther" <[hidden email]> wrote: Hi Dmytro, aggregate functions will support the new type system in Flink 1.12. Until then, they cannot be used with the new `call()` syntax as anonymous functions. In order to use the old type system, you need to register the function explicilty using SQL `CREATE FUNCTION a AS 'myFunc'` and then use them in `call("myFunc", ...)`. The mentioned "No match found for function signature fun(<NUMERIC>)" was a bug that got fixed in 1.11.1: https://issues.apache.org/jira/browse/FLINK-18520 This bug only exists for catalog functions, not temporary system functions. Regards, Timo On 27.07.20 16:35, Dmytro Dragan wrote: > Hi All, > > I see strange behavior of UDAF functions: > > Let`s say we have a simple table: > > EnvironmentSettings settings = > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > TableEnvironment t = TableEnvironment./create/(settings); > > Table table = t.fromValues(DataTypes./ROW/( > DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()), > DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull()) > ), > /row/(1.0, "S"), /row/(2.0, "S")); > t.createTemporaryView("A", table); > > As example we will use build-in function with a new name: > > t.createTemporaryFunction("max_value", new > MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction()); > > Using Table API we can write: > > t.createTemporaryView("B", table > .groupBy(/$/("symbol")) > .select(/$/("symbol"),/call/("max_value", /$/("price"))) > ); > > and get: > > org.apache.flink.table.api.TableException: Aggregate functions are not > updated to the new type system yet. > > Using SQL API we can write: > > t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A > group by symbol")); > > and get: > > org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 1, column 8 to line 1, column 23: No match found for function > signature max_value(<NUMERIC>) > > Calling build-in max function instead of provided alias will produce > correct results. > > In addition, non-retract implementation of max function > (MaxAggFunction.DoubleMaxAggFunction) would produce: > > org.apache.flink.table.api.ValidationException: Could not register > temporary catalog function 'default_catalog.default_database.max_value' > due to implementation errors. > > Cause DoubleMaxAggFunction is not serializable. > > Am I missing something? > |
Hi Dmytro,
I would not recommend to use internal functions from `org.apache.flink.table.planner.functions.aggfunctions`. They are called by a slightly different stack that might cause this exception. Instead you can use the testing functions in `org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions`. Your examples work in my 1.11 branch: @Test public void testWithCreateFunction() { initInput(); String functionClass = "org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg"; String createFunQuery = String.format("CREATE TEMPORARY FUNCTION a AS '%s'", functionClass); tableEnv.executeSql(createFunQuery); tableEnv.createTemporaryView("B", tableEnv.from("A") .groupBy($("symbol")) .select($("symbol"), call("a", $("price").cast(DataTypes.INT()), 12)) ); Table res = tableEnv.from("B"); res.execute().print(); } @Test public void testWithRegisterFunction() { initInput(); String functionClass = "org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg"; String createFunQuery = String.format("CREATE TEMPORARY FUNCTION a AS '%s'", functionClass); tableEnv.executeSql(createFunQuery); Table res = tableEnv.sqlQuery("select a(CAST(price AS INT), 12) as max_price from A group by symbol"); res.execute().print(); } Regards, Timo On 28.07.20 17:20, Dmytro Dragan wrote: > Hi Timo, > > I have switched to 1.11.1. > > Create function using "create function ..." fails with magic: > Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > at java.util.ArrayList.get(ArrayList.java:433) > at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) > at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148) > at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) > at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:96) > at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:46) > at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536) > at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:63) > at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:709) > at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:714) > at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:401) > at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1392) > at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1356) > at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383) > at SortAggregateWithKeys$79.processElement(Unknown Source) > > "No match found for function signature fun(<NUMERIC>)" still exist. > Mentioned bug was about TableFunction, so maybe it is something different, but related. > > I have created a small github project with both cases: > https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java > I would appreciate if you could take a look. > > > On 27/07/2020, 16:49, "Timo Walther" <[hidden email]> wrote: > > Hi Dmytro, > > aggregate functions will support the new type system in Flink 1.12. > Until then, they cannot be used with the new `call()` syntax as > anonymous functions. In order to use the old type system, you need to > register the function explicilty using SQL `CREATE FUNCTION a AS > 'myFunc'` and then use them in `call("myFunc", ...)`. > > The mentioned "No match found for function signature fun(<NUMERIC>)" was > a bug that got fixed in 1.11.1: > > https://issues.apache.org/jira/browse/FLINK-18520 > > This bug only exists for catalog functions, not temporary system functions. > > Regards, > Timo > > > On 27.07.20 16:35, Dmytro Dragan wrote: > > Hi All, > > > > I see strange behavior of UDAF functions: > > > > Let`s say we have a simple table: > > > > EnvironmentSettings settings = > > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment t = TableEnvironment./create/(settings); > > > > Table table = t.fromValues(DataTypes./ROW/( > > DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()), > > DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull()) > > ), > > /row/(1.0, "S"), /row/(2.0, "S")); > > t.createTemporaryView("A", table); > > > > As example we will use build-in function with a new name: > > > > t.createTemporaryFunction("max_value", new > > MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction()); > > > > Using Table API we can write: > > > > t.createTemporaryView("B", table > > .groupBy(/$/("symbol")) > > .select(/$/("symbol"),/call/("max_value", /$/("price"))) > > ); > > > > and get: > > > > org.apache.flink.table.api.TableException: Aggregate functions are not > > updated to the new type system yet. > > > > Using SQL API we can write: > > > > t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A > > group by symbol")); > > > > and get: > > > > org.apache.flink.table.api.ValidationException: SQL validation failed. > > From line 1, column 8 to line 1, column 23: No match found for function > > signature max_value(<NUMERIC>) > > > > Calling build-in max function instead of provided alias will produce > > correct results. > > > > In addition, non-retract implementation of max function > > (MaxAggFunction.DoubleMaxAggFunction) would produce: > > > > org.apache.flink.table.api.ValidationException: Could not register > > temporary catalog function 'default_catalog.default_database.max_value' > > due to implementation errors. > > > > Cause DoubleMaxAggFunction is not serializable. > > > > Am I missing something? > > > > > > |
Hi Timo,
Thank you for your time and your help! Described approach works. On 29/07/2020, 10:22, "Timo Walther" <[hidden email]> wrote: Hi Dmytro, I would not recommend to use internal functions from `org.apache.flink.table.planner.functions.aggfunctions`. They are called by a slightly different stack that might cause this exception. Instead you can use the testing functions in `org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions`. Your examples work in my 1.11 branch: @Test public void testWithCreateFunction() { initInput(); String functionClass = "org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg"; String createFunQuery = String.format("CREATE TEMPORARY FUNCTION a AS '%s'", functionClass); tableEnv.executeSql(createFunQuery); tableEnv.createTemporaryView("B", tableEnv.from("A") .groupBy($("symbol")) .select($("symbol"), call("a", $("price").cast(DataTypes.INT()), 12)) ); Table res = tableEnv.from("B"); res.execute().print(); } @Test public void testWithRegisterFunction() { initInput(); String functionClass = "org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg"; String createFunQuery = String.format("CREATE TEMPORARY FUNCTION a AS '%s'", functionClass); tableEnv.executeSql(createFunQuery); Table res = tableEnv.sqlQuery("select a(CAST(price AS INT), 12) as max_price from A group by symbol"); res.execute().print(); } Regards, Timo On 28.07.20 17:20, Dmytro Dragan wrote: > Hi Timo, > > I have switched to 1.11.1. > > Create function using "create function ..." fails with magic: > Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > at java.util.ArrayList.get(ArrayList.java:433) > at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) > at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148) > at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) > at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:96) > at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:46) > at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536) > at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:63) > at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:709) > at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:714) > at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:401) > at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1392) > at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1356) > at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383) > at SortAggregateWithKeys$79.processElement(Unknown Source) > > "No match found for function signature fun(<NUMERIC>)" still exist. > Mentioned bug was about TableFunction, so maybe it is something different, but related. > > I have created a small github project with both cases: > https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java > I would appreciate if you could take a look. > > > On 27/07/2020, 16:49, "Timo Walther" <[hidden email]> wrote: > > Hi Dmytro, > > aggregate functions will support the new type system in Flink 1.12. > Until then, they cannot be used with the new `call()` syntax as > anonymous functions. In order to use the old type system, you need to > register the function explicilty using SQL `CREATE FUNCTION a AS > 'myFunc'` and then use them in `call("myFunc", ...)`. > > The mentioned "No match found for function signature fun(<NUMERIC>)" was > a bug that got fixed in 1.11.1: > > https://issues.apache.org/jira/browse/FLINK-18520 > > This bug only exists for catalog functions, not temporary system functions. > > Regards, > Timo > > > On 27.07.20 16:35, Dmytro Dragan wrote: > > Hi All, > > > > I see strange behavior of UDAF functions: > > > > Let`s say we have a simple table: > > > > EnvironmentSettings settings = > > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment t = TableEnvironment./create/(settings); > > > > Table table = t.fromValues(DataTypes./ROW/( > > DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()), > > DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull()) > > ), > > /row/(1.0, "S"), /row/(2.0, "S")); > > t.createTemporaryView("A", table); > > > > As example we will use build-in function with a new name: > > > > t.createTemporaryFunction("max_value", new > > MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction()); > > > > Using Table API we can write: > > > > t.createTemporaryView("B", table > > .groupBy(/$/("symbol")) > > .select(/$/("symbol"),/call/("max_value", /$/("price"))) > > ); > > > > and get: > > > > org.apache.flink.table.api.TableException: Aggregate functions are not > > updated to the new type system yet. > > > > Using SQL API we can write: > > > > t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A > > group by symbol")); > > > > and get: > > > > org.apache.flink.table.api.ValidationException: SQL validation failed. > > From line 1, column 8 to line 1, column 23: No match found for function > > signature max_value(<NUMERIC>) > > > > Calling build-in max function instead of provided alias will produce > > correct results. > > > > In addition, non-retract implementation of max function > > (MaxAggFunction.DoubleMaxAggFunction) would produce: > > > > org.apache.flink.table.api.ValidationException: Could not register > > temporary catalog function 'default_catalog.default_database.max_value' > > due to implementation errors. > > > > Cause DoubleMaxAggFunction is not serializable. > > > > Am I missing something? > > > > > > |
Free forum by Nabble | Edit this page |