Hello, I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist. I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be? To demonstrate my example: My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits. Example code: // table will always have pk id def splatFruits(table: Table, columnPrefix: String): Table = { return table .leftOuterJoinLateral( new SplatFruitsFunc()( $"fruits" ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon") ) .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) } @FunctionHint( output = new DataTypeHint( "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)" ) ) class SplatFruitsFunc extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] { def eval(fruits: Array[String]): Unit = { val hasOrange: java.lang.Boolean = fruits.contains("orange") val hasBanana: java.lang.Boolean = fruits.contains("banana") val hasApple: java.lang.Boolean = fruits.contains("apple") val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon") collect(hasOrange, hasBanana, hasApple, hasWatermelon) } } Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving: > org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator. which seems odd because documentation says > Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type. Shouldn't this work as an alternative? On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Looks like `as` needed to move outside of where it was before to fix that error. Though now I'm receiving >org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Aliasing more fields than we actually have. Example code now: // table will always have pk id def splatFruits(table: Table, columnPrefix: String): Table = { return table .map( new SplatFruitsFunc()( $"fruits" ) ) .as( s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon" ) .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) } class SplatFruitsFunc extends ScalarFunction { def eval(fruits: Array[String]): Row = { val hasOrange: java.lang.Boolean = fruits.contains("Orange") val hasBanana: java.lang.Boolean = fruits.contains("Banana") val hasApple: java.lang.Boolean = fruits.contains("Apple") val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon") Row.of(hasOrange, hasBanana, hasApple, hasWatermelon) } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN) } which afaict correctly follows the documentation. Anything here stand out? On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Even odder, if I pull the constructor of the function into its own variable it "works" (though it appears that map only passes through the fields mapped over which means I'll need an additional join, though now I think I'm on the right path). I.e. def splatFruits(table: Table, columnPrefix: String): Table = { val func = new SplatFruitsFunc() return table .map(func($"fruits")) .as( s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon" ) .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) } ends up giving me the following error instead > org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot resolve field [fruits], input field list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, prefix_has_watermelon]. which implies I'll need to join back to the original table like I was doing with the leftOuterJoinLateral originally I suppose. On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
It appears that even when I pass id through the map function and join back with the original table, it does not seem to think that the id passed through map is a unique key. Is there any way to solve this while still preserving the primary key? On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Could you use 4 scalar functions instead of UDTF and map function? For example; select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits), hasWatermelon(fruits) from T; I think this can preserve the primary key. Best, Jark On Thu, 3 Dec 2020 at 15:28, Rex Fenley <[hidden email]> wrote:
|
Thanks, but how would I do this in the Table API? Map? I tried a map function and it still wasn't able to infer the primary key unless I artificially instead did an aggregate function over the pk. On Wed, Dec 9, 2020 at 7:55 PM Jark Wu <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Free forum by Nabble | Edit this page |