Case Class TypeInformation

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Case Class TypeInformation

Joshua Griffith
Hello,

I have a case class that wraps a Flink Row and I’d like to use fields from that Row in a delta iteration join condition. I only have the row’s fields after the job starts. I can construct RowTypeInfo for the Row but I’m not sure how to add that to Flink’s generated type information for the case class. Without it, I understandably get the following error because Flink doesn’t know the Row’s TypeInformation:

org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.

Is there a way to manually construct or annotate the type information for the case class to provide the Row’s type information so it can be used in a join? I could alternately replace the case class with a Tuple and construct a TupleTypeInfo but a tuple is more difficult to use than a case class.

Thanks,

Joshua
Reply | Threaded
Open this post in threaded view
|

Re: Case Class TypeInformation

Joshua Griffith
Correction: I have the row’s RowTypeInfo at runtime before the job starts. I don’t have RowTypeInfo at compile time.

On Oct 16, 2017, at 4:15 PM, Joshua Griffith <[hidden email]> wrote:

Hello,

I have a case class that wraps a Flink Row and I’d like to use fields from that Row in a delta iteration join condition. I only have the row’s fields after the job starts. I can construct RowTypeInfo for the Row but I’m not sure how to add that to Flink’s generated type information for the case class. Without it, I understandably get the following error because Flink doesn’t know the Row’s TypeInformation:

org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.

Is there a way to manually construct or annotate the type information for the case class to provide the Row’s type information so it can be used in a join? I could alternately replace the case class with a Tuple and construct a TupleTypeInfo but a tuple is more difficult to use than a case class.

Thanks,

Joshua

Reply | Threaded
Open this post in threaded view
|

Re: Case Class TypeInformation

Fabian Hueske-2
Hi Joshua,

that's a limitation of the Scala API.
Row requires to explicitly specify a TypeInformation[Row] but it is not possible to inject custom types into a CaseClassTypeInfo, which are automatically generated by a Scala compiler plugin.

The probably easiest solution is to use Flink's Java Tuple classes instead of a case class.

You can import the Java Tuples with
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}

And create a TupleTypeInfo for example with
new TupleTypeInfo(new RowTypeInfo(Types.STRING, Types.LONG), Types.DOUBLE)

Best, Fabian


2017-10-16 23:26 GMT+02:00 Joshua Griffith <[hidden email]>:
Correction: I have the row’s RowTypeInfo at runtime before the job starts. I don’t have RowTypeInfo at compile time.

On Oct 16, 2017, at 4:15 PM, Joshua Griffith <[hidden email]> wrote:

Hello,

I have a case class that wraps a Flink Row and I’d like to use fields from that Row in a delta iteration join condition. I only have the row’s fields after the job starts. I can construct RowTypeInfo for the Row but I’m not sure how to add that to Flink’s generated type information for the case class. Without it, I understandably get the following error because Flink doesn’t know the Row’s TypeInformation:

org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.

Is there a way to manually construct or annotate the type information for the case class to provide the Row’s type information so it can be used in a join? I could alternately replace the case class with a Tuple and construct a TupleTypeInfo but a tuple is more difficult to use than a case class.

Thanks,

Joshua


Reply | Threaded
Open this post in threaded view
|

Re: Case Class TypeInformation

Joshua Griffith
Hello Fabian,

Thank you for the suggestion. I see that an issue has been created to support adding custom type information to case classes: 

Joshua


On Oct 17, 2017, at 3:01 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

that's a limitation of the Scala API.
Row requires to explicitly specify a TypeInformation[Row] but it is not possible to inject custom types into a CaseClassTypeInfo, which are automatically generated by a Scala compiler plugin.

The probably easiest solution is to use Flink's Java Tuple classes instead of a case class.

You can import the Java Tuples with
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}

And create a TupleTypeInfo for example with
new TupleTypeInfo(new RowTypeInfo(Types.STRING, Types.LONG), Types.DOUBLE)

Best, Fabian


2017-10-16 23:26 GMT+02:00 Joshua Griffith <[hidden email]>:
Correction: I have the row’s RowTypeInfo at runtime before the job starts. I don’t have RowTypeInfo at compile time.

On Oct 16, 2017, at 4:15 PM, Joshua Griffith <[hidden email]> wrote:

Hello,

I have a case class that wraps a Flink Row and I’d like to use fields from that Row in a delta iteration join condition. I only have the row’s fields after the job starts. I can construct RowTypeInfo for the Row but I’m not sure how to add that to Flink’s generated type information for the case class. Without it, I understandably get the following error because Flink doesn’t know the Row’s TypeInformation:

org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.

Is there a way to manually construct or annotate the type information for the case class to provide the Row’s type information so it can be used in a join? I could alternately replace the case class with a Tuple and construct a TupleTypeInfo but a tuple is more difficult to use than a case class.

Thanks,

Joshua



Reply | Threaded
Open this post in threaded view
|

Re: Case Class TypeInformation

Fabian Hueske-2
Yes, that JIRA was actually motivated by your question.
Thanks for the feedback :-)

2017-10-25 17:14 GMT+02:00 Joshua Griffith <[hidden email]>:
Hello Fabian,

Thank you for the suggestion. I see that an issue has been created to support adding custom type information to case classes: 

Joshua


On Oct 17, 2017, at 3:01 AM, Fabian Hueske <[hidden email]> wrote:

Hi Joshua,

that's a limitation of the Scala API.
Row requires to explicitly specify a TypeInformation[Row] but it is not possible to inject custom types into a CaseClassTypeInfo, which are automatically generated by a Scala compiler plugin.

The probably easiest solution is to use Flink's Java Tuple classes instead of a case class.

You can import the Java Tuples with
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}

And create a TupleTypeInfo for example with
new TupleTypeInfo(new RowTypeInfo(Types.STRING, Types.LONG), Types.DOUBLE)

Best, Fabian


2017-10-16 23:26 GMT+02:00 Joshua Griffith <[hidden email]>:
Correction: I have the row’s RowTypeInfo at runtime before the job starts. I don’t have RowTypeInfo at compile time.

On Oct 16, 2017, at 4:15 PM, Joshua Griffith <[hidden email]> wrote:

Hello,

I have a case class that wraps a Flink Row and I’d like to use fields from that Row in a delta iteration join condition. I only have the row’s fields after the job starts. I can construct RowTypeInfo for the Row but I’m not sure how to add that to Flink’s generated type information for the case class. Without it, I understandably get the following error because Flink doesn’t know the Row’s TypeInformation:

org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.

Is there a way to manually construct or annotate the type information for the case class to provide the Row’s type information so it can be used in a join? I could alternately replace the case class with a Tuple and construct a TupleTypeInfo but a tuple is more difficult to use than a case class.

Thanks,

Joshua