Primary keys go missing after using TableFunction + leftOuterJoinLateral

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Primary keys go missing after using TableFunction + leftOuterJoinLateral

Rex Fenley
Hello,

I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Rex Fenley
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?


On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[hidden email]> wrote:
Hello,

I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Rex Fenley
Looks like `as` needed to move outside of where it was before to fix that error. Though now I'm receiving
>org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Aliasing more fields than we actually have.

Example code now:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .map(
   new SplatFruitsFunc()(
     $"fruits"
   )
 )
 .as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

class SplatFruitsFunc extends ScalarFunction {
  def eval(fruits: Array[String]): Row = {
    val hasOrange: java.lang.Boolean = fruits.contains("Orange")
    val hasBanana: java.lang.Boolean = fruits.contains("Banana")
    val hasApple: java.lang.Boolean = fruits.contains("Apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
    Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
    Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
}

which afaict correctly follows the documentation.

Anything here stand out?

On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[hidden email]> wrote:
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?


On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[hidden email]> wrote:
Hello,

I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Rex Fenley
Even odder, if I pull the constructor of the function into its own variable it "works" (though it appears that map only passes through the fields mapped over which means I'll need an additional join, though now I think I'm on the right path).

I.e.
def splatFruits(table: Table, columnPrefix: String): Table = {
  val func = new SplatFruitsFunc()
  return table
    .map(func($"fruits"))
    .as(
      s"${columnPrefix}_has_orange",
      s"${columnPrefix}_has_banana",
      s"${columnPrefix}_has_apple",
      s"${columnPrefix}_has_watermelon"
   )
   .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

ends up giving me the following error instead
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot resolve field [fruits], input field list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, prefix_has_watermelon].

which implies I'll need to join back to the original table like I was doing with the leftOuterJoinLateral originally I suppose.


On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <[hidden email]> wrote:
Looks like `as` needed to move outside of where it was before to fix that error. Though now I'm receiving
>org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Aliasing more fields than we actually have.

Example code now:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .map(
   new SplatFruitsFunc()(
     $"fruits"
   )
 )
 .as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

class SplatFruitsFunc extends ScalarFunction {
  def eval(fruits: Array[String]): Row = {
    val hasOrange: java.lang.Boolean = fruits.contains("Orange")
    val hasBanana: java.lang.Boolean = fruits.contains("Banana")
    val hasApple: java.lang.Boolean = fruits.contains("Apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
    Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
    Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
}

which afaict correctly follows the documentation.

Anything here stand out?

On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[hidden email]> wrote:
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?


On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[hidden email]> wrote:
Hello,

I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Rex Fenley
It appears that even when I pass id through the map function and join back with the original table, it does not seem to think that the id passed through map is a unique key. Is there any way to solve this while still preserving the primary key?

On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <[hidden email]> wrote:
Even odder, if I pull the constructor of the function into its own variable it "works" (though it appears that map only passes through the fields mapped over which means I'll need an additional join, though now I think I'm on the right path).

I.e.
def splatFruits(table: Table, columnPrefix: String): Table = {
  val func = new SplatFruitsFunc()
  return table
    .map(func($"fruits"))
    .as(
      s"${columnPrefix}_has_orange",
      s"${columnPrefix}_has_banana",
      s"${columnPrefix}_has_apple",
      s"${columnPrefix}_has_watermelon"
   )
   .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

ends up giving me the following error instead
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot resolve field [fruits], input field list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, prefix_has_watermelon].

which implies I'll need to join back to the original table like I was doing with the leftOuterJoinLateral originally I suppose.


On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <[hidden email]> wrote:
Looks like `as` needed to move outside of where it was before to fix that error. Though now I'm receiving
>org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Aliasing more fields than we actually have.

Example code now:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .map(
   new SplatFruitsFunc()(
     $"fruits"
   )
 )
 .as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

class SplatFruitsFunc extends ScalarFunction {
  def eval(fruits: Array[String]): Row = {
    val hasOrange: java.lang.Boolean = fruits.contains("Orange")
    val hasBanana: java.lang.Boolean = fruits.contains("Banana")
    val hasApple: java.lang.Boolean = fruits.contains("Apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
    Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
    Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
}

which afaict correctly follows the documentation.

Anything here stand out?

On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[hidden email]> wrote:
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?


On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[hidden email]> wrote:
Hello,

I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Jark Wu-3
Could you use 4 scalar functions instead of UDTF and map function? For example;

select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits), hasWatermelon(fruits)
from T;

I think this can preserve the primary key. 

Best,
Jark

On Thu, 3 Dec 2020 at 15:28, Rex Fenley <[hidden email]> wrote:
It appears that even when I pass id through the map function and join back with the original table, it does not seem to think that the id passed through map is a unique key. Is there any way to solve this while still preserving the primary key?

On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <[hidden email]> wrote:
Even odder, if I pull the constructor of the function into its own variable it "works" (though it appears that map only passes through the fields mapped over which means I'll need an additional join, though now I think I'm on the right path).

I.e.
def splatFruits(table: Table, columnPrefix: String): Table = {
  val func = new SplatFruitsFunc()
  return table
    .map(func($"fruits"))
    .as(
      s"${columnPrefix}_has_orange",
      s"${columnPrefix}_has_banana",
      s"${columnPrefix}_has_apple",
      s"${columnPrefix}_has_watermelon"
   )
   .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

ends up giving me the following error instead
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot resolve field [fruits], input field list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, prefix_has_watermelon].

which implies I'll need to join back to the original table like I was doing with the leftOuterJoinLateral originally I suppose.


On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <[hidden email]> wrote:
Looks like `as` needed to move outside of where it was before to fix that error. Though now I'm receiving
>org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Aliasing more fields than we actually have.

Example code now:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .map(
   new SplatFruitsFunc()(
     $"fruits"
   )
 )
 .as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

class SplatFruitsFunc extends ScalarFunction {
  def eval(fruits: Array[String]): Row = {
    val hasOrange: java.lang.Boolean = fruits.contains("Orange")
    val hasBanana: java.lang.Boolean = fruits.contains("Banana")
    val hasApple: java.lang.Boolean = fruits.contains("Apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
    Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
    Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
}

which afaict correctly follows the documentation.

Anything here stand out?

On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[hidden email]> wrote:
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?


On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[hidden email]> wrote:
Hello,

I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Rex Fenley
Thanks, but how would I do this in the Table API? Map? I tried a map function and it still wasn't able to infer the primary key unless I artificially instead did an aggregate function over the pk.

On Wed, Dec 9, 2020 at 7:55 PM Jark Wu <[hidden email]> wrote:
Could you use 4 scalar functions instead of UDTF and map function? For example;

select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits), hasWatermelon(fruits)
from T;

I think this can preserve the primary key. 

Best,
Jark

On Thu, 3 Dec 2020 at 15:28, Rex Fenley <[hidden email]> wrote:
It appears that even when I pass id through the map function and join back with the original table, it does not seem to think that the id passed through map is a unique key. Is there any way to solve this while still preserving the primary key?

On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <[hidden email]> wrote:
Even odder, if I pull the constructor of the function into its own variable it "works" (though it appears that map only passes through the fields mapped over which means I'll need an additional join, though now I think I'm on the right path).

I.e.
def splatFruits(table: Table, columnPrefix: String): Table = {
  val func = new SplatFruitsFunc()
  return table
    .map(func($"fruits"))
    .as(
      s"${columnPrefix}_has_orange",
      s"${columnPrefix}_has_banana",
      s"${columnPrefix}_has_apple",
      s"${columnPrefix}_has_watermelon"
   )
   .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

ends up giving me the following error instead
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot resolve field [fruits], input field list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, prefix_has_watermelon].

which implies I'll need to join back to the original table like I was doing with the leftOuterJoinLateral originally I suppose.


On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <[hidden email]> wrote:
Looks like `as` needed to move outside of where it was before to fix that error. Though now I'm receiving
>org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Aliasing more fields than we actually have.

Example code now:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .map(
   new SplatFruitsFunc()(
     $"fruits"
   )
 )
 .as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

class SplatFruitsFunc extends ScalarFunction {
  def eval(fruits: Array[String]): Row = {
    val hasOrange: java.lang.Boolean = fruits.contains("Orange")
    val hasBanana: java.lang.Boolean = fruits.contains("Banana")
    val hasApple: java.lang.Boolean = fruits.contains("Apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
    Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
    Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
}

which afaict correctly follows the documentation.

Anything here stand out?

On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[hidden email]> wrote:
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?


On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[hidden email]> wrote:
Hello,

I have a TableFunction and wherever it is applied with a leftOuterJoinLateral, my table loses any inference of there being a primary key. I see this because all subsequent joins end up with "NoUniqueKey" when I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string array, and depending on whether or not it is there, it appends a column with value true or false. For example, if array "fruits" which could possibly contain orange, banana, apple, and watermelon on a row contains only `["orange", "apple"]` then it will append `has_orange: true, has_banana: false, has_apple: true, has_watermelon: false` as columns to the row. This example is essentially the same as my code, outside of having a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US