Nested Field Expressions with Rows

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Nested Field Expressions with Rows

Joshua Griffith
Hello,

When using nested field expressions like “Account.Id" with nested rows, I get the following error, “This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there a way to make nested field expressions work with nested rows?

Thanks,

Joshua
Reply | Threaded
Open this post in threaded view
|

Re: Nested Field Expressions with Rows

Nico Kruber
Can you show a minimal example of the query you are trying to run?
Maybe Timo or Fabian (cc'd) can help.


Nico

On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
> Hello,
>
> When using nested field expressions like “Account.Id" with nested rows, I
> get the following error, “This type
> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
> a way to make nested field expressions work with nested rows?
 
> Thanks,
>
> Joshua


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Nested Field Expressions with Rows

Joshua Griffith
Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields:

package com.github.hadronzoo.rowerror

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object Main {

  class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row] {
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }

    override def getProducedType: TypeInformation[Row] =
      new RowTypeInfo(
        Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
        Array("id", "value")
      )
  }

  def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case (a, b) => (a, b) }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.createLocalEnvironment()
    val rowFn = new MakeRow

    val ints = 0 until 1000
    val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
    val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)

    val evenRows = env.fromCollection(evenIntegers).map(rowFn)
    val oddRows = env.fromCollection(oddIntegers).map(rowFn)

    evenRows.join(oddRows).where("id").equalTo("id").print()
  }
}

Executing the above yields the following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
at com.github.hadronzoo.rowerror.Main.main(Main.scala)

For my application I only have TypeInformation at runtime (before the execution graph is built). Is it possible to use Row fields in join operations or is there an error with my implementation?

Thanks for your help,

Joshua

On Jul 10, 2017, at 9:09 AM, Nico Kruber <[hidden email]> wrote:

Can you show a minimal example of the query you are trying to run?
Maybe Timo or Fabian (cc'd) can help.


Nico

On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
Hello,

When using nested field expressions like “Account.Id" with nested rows, I
get the following error, “This type
(GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
a way to make nested field expressions work with nested rows?

Thanks,

Joshua



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Nested Field Expressions with Rows

Fabian Hueske-2
Hi Joshua,

thanks for reporting this issue. You code is fine but IMO there is a bug in the Scala DataSet API.
It simply does not respect the type information provided by the ResultTypeQueryable[Row] interface and defaults to a GenericType.

I think this should be fix. I'll open a JIRA issue for that.

You can explicitly declare types with implicits if you put the following lines above the lines in which you apply the rowFn on the DataSet.

implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
Array("id", "value")
)
When you do this, you can also remove move the ResultTypeQueryable interface from the MapFunction.

Cheers, Fabian



2017-07-10 18:10 GMT+02:00 Joshua Griffith <[hidden email]>:
Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields:

package com.github.hadronzoo.rowerror

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object Main {

  class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row] {
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }

    override def getProducedType: TypeInformation[Row] =
      new RowTypeInfo(
        Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
        Array("id", "value")
      )
  }

  def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case (a, b) => (a, b) }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.createLocalEnvironment()
    val rowFn = new MakeRow

    val ints = 0 until 1000
    val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
    val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)

    val evenRows = env.fromCollection(evenIntegers).map(rowFn)
    val oddRows = env.fromCollection(oddIntegers).map(rowFn)

    evenRows.join(oddRows).where("id").equalTo("id").print()
  }
}

Executing the above yields the following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
at com.github.hadronzoo.rowerror.Main.main(Main.scala)

For my application I only have TypeInformation at runtime (before the execution graph is built). Is it possible to use Row fields in join operations or is there an error with my implementation?

Thanks for your help,

Joshua

On Jul 10, 2017, at 9:09 AM, Nico Kruber <[hidden email]> wrote:

Can you show a minimal example of the query you are trying to run?
Maybe Timo or Fabian (cc'd) can help.


Nico

On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
Hello,

When using nested field expressions like “Account.Id" with nested rows, I
get the following error, “This type
(GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
a way to make nested field expressions work with nested rows?

Thanks,

Joshua



Reply | Threaded
Open this post in threaded view
|

Re: Nested Field Expressions with Rows

Joshua Griffith
Hello Fabian,

Thank you for your response. I tried your recommendation but I’m getting the same issue. Here’s the altered MakeRow MapFunction I tried:

  class MakeRow extends MapFunction[(Integer, Integer), Row] {
    implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
      Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
      Array("id", "value")
    )
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }
  }
 
In stepping through the code execution, it looks like the problem is that Row.isKeyType() returns false. Any recommendations?

Thanks,

Joshua


On Jul 10, 2017, at 11:42 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

thanks for reporting this issue. You code is fine but IMO there is a bug in the Scala DataSet API.
It simply does not respect the type information provided by the ResultTypeQueryable[Row] interface and defaults to a GenericType.

I think this should be fix. I'll open a JIRA issue for that.

You can explicitly declare types with implicits if you put the following lines above the lines in which you apply the rowFn on the DataSet.

implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
Array("id", "value")
)
When you do this, you can also remove move the ResultTypeQueryable interface from the MapFunction.

Cheers, Fabian



2017-07-10 18:10 GMT+02:00 Joshua Griffith <[hidden email]>:
Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields:

package com.github.hadronzoo.rowerror

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object Main {

  class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row] {
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }

    override def getProducedType: TypeInformation[Row] =
      new RowTypeInfo(
        Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
        Array("id", "value")
      )
  }

  def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case (a, b) => (a, b) }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.createLocalEnvironment()
    val rowFn = new MakeRow

    val ints = 0 until 1000
    val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
    val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)

    val evenRows = env.fromCollection(evenIntegers).map(rowFn)
    val oddRows = env.fromCollection(oddIntegers).map(rowFn)

    evenRows.join(oddRows).where("id").equalTo("id").print()
  }
}

Executing the above yields the following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
at com.github.hadronzoo.rowerror.Main.main(Main.scala)

For my application I only have TypeInformation at runtime (before the execution graph is built). Is it possible to use Row fields in join operations or is there an error with my implementation?

Thanks for your help,

Joshua

On Jul 10, 2017, at 9:09 AM, Nico Kruber <[hidden email]> wrote:

Can you show a minimal example of the query you are trying to run?
Maybe Timo or Fabian (cc'd) can help.


Nico

On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
Hello,

When using nested field expressions like “Account.Id" with nested rows, I
get the following error, “This type
(GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
a way to make nested field expressions work with nested rows?

Thanks,

Joshua





signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Nested Field Expressions with Rows

Joshua Griffith
I apologize, that was the wrong link. Here’s where the exception is thrown: https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L329-L331

On Jul 10, 2017, at 11:54 AM, Joshua Griffith <[hidden email]> wrote:

Hello Fabian,

Thank you for your response. I tried your recommendation but I’m getting the same issue. Here’s the altered MakeRow MapFunction I tried:

  class MakeRow extends MapFunction[(Integer, Integer), Row] {
    implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
      Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
      Array("id", "value")
    )
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }
  }
 
In stepping through the code execution, it looks like the problem is that Row.isKeyType() returns false. Any recommendations?

Thanks,

Joshua


On Jul 10, 2017, at 11:42 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

thanks for reporting this issue. You code is fine but IMO there is a bug in the Scala DataSet API.
It simply does not respect the type information provided by the ResultTypeQueryable[Row] interface and defaults to a GenericType.

I think this should be fix. I'll open a JIRA issue for that.

You can explicitly declare types with implicits if you put the following lines above the lines in which you apply the rowFn on the DataSet.

implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
Array("id", "value")
)
When you do this, you can also remove move the ResultTypeQueryable interface from the MapFunction.

Cheers, Fabian



2017-07-10 18:10 GMT+02:00 Joshua Griffith <[hidden email]>:
Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields:

package com.github.hadronzoo.rowerror

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object Main {

  class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row] {
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }

    override def getProducedType: TypeInformation[Row] =
      new RowTypeInfo(
        Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
        Array("id", "value")
      )
  }

  def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case (a, b) => (a, b) }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.createLocalEnvironment()
    val rowFn = new MakeRow

    val ints = 0 until 1000
    val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
    val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)

    val evenRows = env.fromCollection(evenIntegers).map(rowFn)
    val oddRows = env.fromCollection(oddIntegers).map(rowFn)

    evenRows.join(oddRows).where("id").equalTo("id").print()
  }
}

Executing the above yields the following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
at com.github.hadronzoo.rowerror.Main.main(Main.scala)

For my application I only have TypeInformation at runtime (before the execution graph is built). Is it possible to use Row fields in join operations or is there an error with my implementation?

Thanks for your help,

Joshua

On Jul 10, 2017, at 9:09 AM, Nico Kruber <[hidden email]> wrote:

Can you show a minimal example of the query you are trying to run?
Maybe Timo or Fabian (cc'd) can help.


Nico

On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
Hello,

When using nested field expressions like “Account.Id" with nested rows, I
get the following error, “This type
(GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
a way to make nested field expressions work with nested rows?

Thanks,

Joshua






signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Nested Field Expressions with Rows

Fabian Hueske-2
In reply to this post by Joshua Griffith
Hi,

You have to add the implicit value in the main() method before you call .map(rowFn) and not in the MapFunction.

Best, Fabian


2017-07-10 18:54 GMT+02:00 Joshua Griffith <[hidden email]>:
Hello Fabian,

Thank you for your response. I tried your recommendation but I’m getting the same issue. Here’s the altered MakeRow MapFunction I tried:

  class MakeRow extends MapFunction[(Integer, Integer), Row] {
    implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
      Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
      Array("id", "value")
    )
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }
  }
 
In stepping through the code execution, it looks like the problem is that Row.isKeyType() returns false. Any recommendations?

Thanks,

Joshua


On Jul 10, 2017, at 11:42 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

thanks for reporting this issue. You code is fine but IMO there is a bug in the Scala DataSet API.
It simply does not respect the type information provided by the ResultTypeQueryable[Row] interface and defaults to a GenericType.

I think this should be fix. I'll open a JIRA issue for that.

You can explicitly declare types with implicits if you put the following lines above the lines in which you apply the rowFn on the DataSet.

implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
Array("id", "value")
)
When you do this, you can also remove move the ResultTypeQueryable interface from the MapFunction.

Cheers, Fabian



2017-07-10 18:10 GMT+02:00 Joshua Griffith <[hidden email]>:
Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields:

package com.github.hadronzoo.rowerror

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.types.Row

object Main {

  class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row] {
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }

    override def getProducedType: TypeInformation[Row] =
      new RowTypeInfo(
        Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
        Array("id", "value")
      )
  }

  def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case (a, b) => (a, b) }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.createLocalEnvironment()
    val rowFn = new MakeRow

    val ints = 0 until 1000
    val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
    val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)

    val evenRows = env.fromCollection(evenIntegers).map(rowFn)
    val oddRows = env.fromCollection(oddIntegers).map(rowFn)

    evenRows.join(oddRows).where("id").equalTo("id").print()
  }
}

Executing the above yields the following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
at com.github.hadronzoo.rowerror.Main.main(Main.scala)

For my application I only have TypeInformation at runtime (before the execution graph is built). Is it possible to use Row fields in join operations or is there an error with my implementation?

Thanks for your help,

Joshua

On Jul 10, 2017, at 9:09 AM, Nico Kruber <[hidden email]> wrote:

Can you show a minimal example of the query you are trying to run?
Maybe Timo or Fabian (cc'd) can help.


Nico

On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
Hello,

When using nested field expressions like “Account.Id" with nested rows, I
get the following error, “This type
(GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
a way to make nested field expressions work with nested rows?

Thanks,

Joshua





Reply | Threaded
Open this post in threaded view
|

Re: Nested Field Expressions with Rows

Joshua Griffith
Indeed that worked. Thanks!

On Jul 10, 2017, at 11:57 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

You have to add the implicit value in the main() method before you call .map(rowFn) and not in the MapFunction.

Best, Fabian


2017-07-10 18:54 GMT+02:00 Joshua Griffith <[hidden email]>:
Hello Fabian,

Thank you for your response. I tried your recommendation but I’m getting the same issue. Here’s the altered MakeRow MapFunction I tried:

  class MakeRow extends MapFunction[(Integer, Integer), Row] {
    implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
      Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
      Array("id", "value")
    )
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }
  }
 
In stepping through the code execution, it looks like the problem is that Row.isKeyType() returns false. Any recommendations?

Thanks,

Joshua


On Jul 10, 2017, at 11:42 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

thanks for reporting this issue. You code is fine but IMO there is a bug in the Scala DataSet API.
It simply does not respect the type information provided by the ResultTypeQueryable[Row] interface and defaults to a GenericType.

I think this should be fix. I'll open a JIRA issue for that.

You can explicitly declare types with implicits if you put the following lines above the lines in which you apply the rowFn on the DataSet.

implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
Array("id", "value")
)
When you do this, you can also remove move the ResultTypeQueryable interface from the MapFunction.

Cheers, Fabian



2017-07-10 18:10 GMT+02:00 Joshua Griffith <[hidden email]>:
Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields:

package com.github.hadronzoo.rowerror

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.types.Row

object Main {

  class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row] {
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }

    override def getProducedType: TypeInformation[Row] =
      new RowTypeInfo(
        Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
        Array("id", "value")
      )
  }

  def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case (a, b) => (a, b) }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.createLocalEnvironment()
    val rowFn = new MakeRow

    val ints = 0 until 1000
    val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
    val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)

    val evenRows = env.fromCollection(evenIntegers).map(rowFn)
    val oddRows = env.fromCollection(oddIntegers).map(rowFn)

    evenRows.join(oddRows).where("id").equalTo("id").print()
  }
}

Executing the above yields the following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
at com.github.hadronzoo.rowerror.Main.main(Main.scala)

For my application I only have TypeInformation at runtime (before the execution graph is built). Is it possible to use Row fields in join operations or is there an error with my implementation?

Thanks for your help,

Joshua

On Jul 10, 2017, at 9:09 AM, Nico Kruber <[hidden email]> wrote:

Can you show a minimal example of the query you are trying to run?
Maybe Timo or Fabian (cc'd) can help.


Nico

On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
Hello,

When using nested field expressions like “Account.Id" with nested rows, I
get the following error, “This type
(GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
a way to make nested field expressions work with nested rows?

Thanks,

Joshua







signature.asc (849 bytes) Download Attachment