Serialisation problem

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialisation problem

Abdulrahman kaitoua


Hello,

I would like to hive directions to make my code work again (thanks in advance). My code used to work on versions equal or less than 9.1 but when i included 10 or 10.1 i got the following exception.

This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key

I understood that it is related to the serialisation of objects. I tried to follow the POJO building directions in https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization
with no luck to make it work.

my dataset contains a set of tuples as key and one array of GValues, this is a snapshot of the GValue class.


sealed trait GValue extends Serializable with Ordered[GValue]{
def compare(o : GValue) : Int = {
o match {
case GDouble(v) => this.asInstanceOf[GDouble].v compare v
case GString(v) => this.asInstanceOf[GString].v compare v
case GInt(v) => this.asInstanceOf[GInt].v compare v
case GNull() => 0
}
}
def equal(o : GValue) : Boolean = {
o match {
case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
case GNull() => o.isInstanceOf[GNull]
case _ => false
}
}
}

/**
* Represents a @GValue that contains an integer
* @deprecated
* @param v
*/
case class GInt(v: Int) extends GValue{
def this() = this(0)
override def toString() : String = {
v.toString
}
override def equals(other : Any) : Boolean = {
other match {
case GInt(value) => value.equals(v)
case _ => false
}
}
}

/**
* Represents a @GValue that contains a number as a @Double
* @param v number
*/
case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{

def this() = this(0.0)

override def equals(other : Any) : Boolean = {
other match {
case GDouble(value) => value.equals(v)
case _ => false
}
}
}

/**
* Represents a @GValue that contains a @String
* @param v string
*/
case class GString(v: String) extends GValue{
def this() = this(".")
override def toString() : String = {
v.toString
}
override def equals(other : Any) : Boolean = {
other match {
case GString(value) => value.equals(v)
case _ => false
}
}
}

Regards,

-----------------------------------------------------------------
Abdulrahman Kaitoua
-----------------------------------------------------------------
Ph.D. Candidate at Politecnico Di Milano

Reply | Threaded
Open this post in threaded view
|

Re: Serialisation problem

rmetzger0
Hi,

Can you check the log output in your IDE or the log files of the Flink client (./bin/flink). The TypeExtractor is logging why a POJO is not recognized as a POJO.

The log statements look like this:

20:42:43,465 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.dataartisans.debug.MyPojo must have a default constructor to be used as a POJO.



On Thu, Dec 10, 2015 at 11:24 PM, Abdulrahman kaitoua <[hidden email]> wrote:


Hello,

I would like to hive directions to make my code work again (thanks in advance). My code used to work on versions equal or less than 9.1 but when i included 10 or 10.1 i got the following exception.

This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key

I understood that it is related to the serialisation of objects. I tried to follow the POJO building directions in https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization
with no luck to make it work.

my dataset contains a set of tuples as key and one array of GValues, this is a snapshot of the GValue class.


sealed trait GValue extends Serializable with Ordered[GValue]{
def compare(o : GValue) : Int = {
o match {
case GDouble(v) => this.asInstanceOf[GDouble].v compare v
case GString(v) => this.asInstanceOf[GString].v compare v
case GInt(v) => this.asInstanceOf[GInt].v compare v
case GNull() => 0
}
}
def equal(o : GValue) : Boolean = {
o match {
case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
case GNull() => o.isInstanceOf[GNull]
case _ => false
}
}
}

/**
* Represents a @GValue that contains an integer
* @deprecated
* @param v
*/
case class GInt(v: Int) extends GValue{
def this() = this(0)
override def toString() : String = {
v.toString
}
override def equals(other : Any) : Boolean = {
other match {
case GInt(value) => value.equals(v)
case _ => false
}
}
}

/**
* Represents a @GValue that contains a number as a @Double
* @param v number
*/
case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{

def this() = this(0.0)

override def equals(other : Any) : Boolean = {
other match {
case GDouble(value) => value.equals(v)
case _ => false
}
}
}

/**
* Represents a @GValue that contains a @String
* @param v string
*/
case class GString(v: String) extends GValue{
def this() = this(".")
override def toString() : String = {
v.toString
}
override def equals(other : Any) : Boolean = {
other match {
case GString(value) => value.equals(v)
case _ => false
}
}
}

Regards,

-----------------------------------------------------------------
Abdulrahman Kaitoua
-----------------------------------------------------------------
Ph.D. Candidate at Politecnico Di Milano


Reply | Threaded
Open this post in threaded view
|

Re: Serialisation problem

Aljoscha Krettek
Hi,
the problem could be that GValue is not Comparable. Could you try making it extend Comparable (The Java Comparable).

Cheers,
Aljoscha

> On 12 Dec 2015, at 20:43, Robert Metzger <[hidden email]> wrote:
>
> Hi,
>
> Can you check the log output in your IDE or the log files of the Flink client (./bin/flink). The TypeExtractor is logging why a POJO is not recognized as a POJO.
>
> The log statements look like this:
>
> 20:42:43,465 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.dataartisans.debug.MyPojo must have a default constructor to be used as a POJO.
>
>
>
> On Thu, Dec 10, 2015 at 11:24 PM, Abdulrahman kaitoua <[hidden email]> wrote:
>
>
> Hello,
>
> I would like to hive directions to make my code work again (thanks in advance). My code used to work on versions equal or less than 9.1 but when i included 10 or 10.1 i got the following exception.
>
> This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key
>
> I understood that it is related to the serialisation of objects. I tried to follow the POJO building directions in https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization
> with no luck to make it work.
>
> my dataset contains a set of tuples as key and one array of GValues, this is a snapshot of the GValue class.
>
>
> sealed trait GValue extends Serializable with Ordered[GValue]{
>   def compare(o : GValue) : Int = {
>     o match {
>       case GDouble(v) => this.asInstanceOf[GDouble].v compare v
>       case GString(v) => this.asInstanceOf[GString].v compare v
>       case GInt(v) => this.asInstanceOf[GInt].v compare v
>       case GNull() => 0
>     }
>   }
>   def equal(o : GValue) : Boolean = {
>     o match {
>       case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
>       case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
>       case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
>       case GNull() => o.isInstanceOf[GNull]
>       case _ => false
>     }
>   }
> }
>
> /**
>  * Represents a @GValue that contains an integer
>  * @deprecated
>  * @param v
>  */
> case class GInt(v: Int) extends GValue{
>   def this() = this(0)
>   override def toString() : String = {
>     v.toString
>   }
>   override def equals(other : Any) : Boolean = {
>     other match {
>       case GInt(value) => value.equals(v)
>       case _ => false
>     }
>   }
> }
>
> /**
>  * Represents a @GValue that contains a number as a @Double
>  * @param v number
>  */
> case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{
>
>   def this() = this(0.0)
>
>   override def equals(other : Any) : Boolean = {
>     other match {
>       case GDouble(value) => value.equals(v)
>       case _ => false
>     }
>   }
> }
>
> /**
>  * Represents a @GValue that contains a @String
>  * @param v string
>  */
> case class GString(v: String) extends GValue{
>   def this() = this(".")
>   override def toString() : String = {
>     v.toString
>   }
>   override def equals(other : Any) : Boolean = {
>     other match {
>       case GString(value) => value.equals(v)
>       case _ => false
>     }
>   }
> }
>
> Regards,
>
> -----------------------------------------------------------------
> Abdulrahman Kaitoua
> -----------------------------------------------------------------
> Ph.D. Candidate at Politecnico Di Milano
>
>

Reply | Threaded
Open this post in threaded view
|

RE: Serialisation problem

Abdulrahman kaitoua

I still have the same problem even when i extended GValue with comparable. I think that the problem might be in the fact that Array[GValue] are not compatible and not the GValues but i do not know how to fix it in flink, may be some implicit ordering would work (and why the this field, Array[GValue], is compared). I appreciate every and any help.

I still do not understand why this was not problem in previous versions.

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key.
at org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:308)
at org.apache.flink.api.java.operators.DistinctOperator.<init>(DistinctOperator.java:56)
at org.apache.flink.api.scala.DataSet.distinct(DataSet.scala:740)
at it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricMap4$.execute(GenometricMap4.scala:71)

sealed trait GValue extends Serializable with Comparable[GValue] with Ordered[GValue]{
def compare(o : GValue) : Int = {
o match {
case GDouble(v) => this.asInstanceOf[GDouble].v compare v
case GString(v) => this.asInstanceOf[GString].v compare v
case GInt(v) => this.asInstanceOf[GInt].v compare v
case _ => 0
}
}
def equal(o : GValue) : Boolean = {
o match {
case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
case GNull() => o.isInstanceOf[GNull]
case _ => false
}
}
override def compareTo(o: GValue): Int = {
o match {
case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch { case e : Throwable => 0 }
case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => 0 }
case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)} catch { case e : Throwable => 0 }
case GNull() => 0
case _ => 0
}
}
}

-----------------------------------------------------------------
Abdulrahman Kaitoua
-----------------------------------------------------------------
Ph.D. Candidate at Politecnico Di Milano



> Subject: Re: Serialisation problem

> From: [hidden email]
> Date: Mon, 14 Dec 2015 10:42:22 +0100
> To: [hidden email]
>
> Hi,
> the problem could be that GValue is not Comparable. Could you try making it extend Comparable (The Java Comparable).
>
> Cheers,
> Aljoscha
> > On 12 Dec 2015, at 20:43, Robert Metzger <[hidden email]> wrote:
> >
> > Hi,
> >
> > Can you check the log output in your IDE or the log files of the Flink client (./bin/flink). The TypeExtractor is logging why a POJO is not recognized as a POJO.
> >
> > The log statements look like this:
> >
> > 20:42:43,465 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.dataartisans.debug.MyPojo must have a default constructor to be used as a POJO.
> >
> >
> >
> > On Thu, Dec 10, 2015 at 11:24 PM, Abdulrahman kaitoua <[hidden email]> wrote:
> >
> >
> > Hello,
> >
> > I would like to hive directions to make my code work again (thanks in advance). My code used to work on versions equal or less than 9.1 but when i included 10 or 10.1 i got the following exception.
> >
> > This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key
> >
> > I understood that it is related to the serialisation of objects. I tried to follow the POJO building directions in https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization
> > with no luck to make it work.
> >
> > my dataset contains a set of tuples as key and one array of GValues, this is a snapshot of the GValue class.
> >
> >
> > sealed trait GValue extends Serializable with Ordered[GValue]{
> > def compare(o : GValue) : Int = {
> > o match {
> > case GDouble(v) => this.asInstanceOf[GDouble].v compare v
> > case GString(v) => this.asInstanceOf[GString].v compare v
> > case GInt(v) => this.asInstanceOf[GInt].v compare v
> > case GNull() => 0
> > }
> > }
> > def equal(o : GValue) : Boolean = {
> > o match {
> > case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
> > case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
> > case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
> > case GNull() => o.isInstanceOf[GNull]
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains an integer
> > * @deprecated
> > * @param v
> > */
> > case class GInt(v: Int) extends GValue{
> > def this() = this(0)
> > override def toString() : String = {
> > v.toString
> > }
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GInt(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains a number as a @Double
> > * @param v number
> > */
> > case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{
> >
> > def this() = this(0.0)
> >
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GDouble(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains a @String
> > * @param v string
> > */
> > case class GString(v: String) extends GValue{
> > def this() = this(".")
> > override def toString() : String = {
> > v.toString
> > }
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GString(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > Regards,
> >
> > -----------------------------------------------------------------
> > Abdulrahman Kaitoua
> > -----------------------------------------------------------------
> > Ph.D. Candidate at Politecnico Di Milano
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Serialisation problem

Fabian Hueske-2
Hi,

In your program, you apply a distinct transformation on a data set that has a (nested) GValue[] type. Distinct requires that all fields are comparable with each other. Therefore all fields of the data sets' type must be valid key types.
However, Flink does not support object arrays as keys types. This is regardless of the type of the object and includes GValue[] whether it implements Comparable or not. Object array types can't simply be used as keys right now :-(

I see two ways to add this functionality. In both cases you need to get a bit into the details of Flink's type system, serialization, and comparators.

1) You implement your own type information for GValue[] arrays, plus serializer and comparator and manually inject this information.

2) You extend ObjectArrayTypeInfo, such that it supports key operations, if the component type supports key operations. This would require to implement a TypeComparator and some modifications to ObjectArrayTypeInfo. This change would also be a valuable contribution to Flink.

I recommend to have a look at Flink's other type infos, serializers, and comparators to learn how this is done in Flink.

Best,
Fabian




2015-12-20 22:01 GMT+01:00 Abdulrahman kaitoua <[hidden email]>:

I still have the same problem even when i extended GValue with comparable. I think that the problem might be in the fact that Array[GValue] are not compatible and not the GValues but i do not know how to fix it in flink, may be some implicit ordering would work (and why the this field, Array[GValue], is compared). I appreciate every and any help.

I still do not understand why this was not problem in previous versions.

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key.
at org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:308)
at org.apache.flink.api.java.operators.DistinctOperator.<init>(DistinctOperator.java:56)
at org.apache.flink.api.scala.DataSet.distinct(DataSet.scala:740)
at it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricMap4$.execute(GenometricMap4.scala:71)

sealed trait GValue extends Serializable with Comparable[GValue] with Ordered[GValue]{
def compare(o : GValue) : Int = {
o match {
case GDouble(v) => this.asInstanceOf[GDouble].v compare v
case GString(v) => this.asInstanceOf[GString].v compare v
case GInt(v) => this.asInstanceOf[GInt].v compare v
case _ => 0
}
}
def equal(o : GValue) : Boolean = {
o match {
case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
case GNull() => o.isInstanceOf[GNull]
case _ => false
}
}
override def compareTo(o: GValue): Int = {
o match {
case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch { case e : Throwable => 0 }
case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => 0 }
case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)} catch { case e : Throwable => 0 }
case GNull() => 0
case _ => 0
}
}
}

-----------------------------------------------------------------
Abdulrahman Kaitoua
-----------------------------------------------------------------
Ph.D. Candidate at Politecnico Di Milano



> Subject: Re: Serialisation problem

> From: [hidden email]
> Date: Mon, 14 Dec 2015 10:42:22 +0100
> To: [hidden email]

>
> Hi,
> the problem could be that GValue is not Comparable. Could you try making it extend Comparable (The Java Comparable).
>
> Cheers,
> Aljoscha
> > On 12 Dec 2015, at 20:43, Robert Metzger <[hidden email]> wrote:
> >
> > Hi,
> >
> > Can you check the log output in your IDE or the log files of the Flink client (./bin/flink). The TypeExtractor is logging why a POJO is not recognized as a POJO.
> >
> > The log statements look like this:
> >
> > 20:42:43,465 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.dataartisans.debug.MyPojo must have a default constructor to be used as a POJO.
> >
> >
> >
> > On Thu, Dec 10, 2015 at 11:24 PM, Abdulrahman kaitoua <[hidden email]> wrote:
> >
> >
> > Hello,
> >
> > I would like to hive directions to make my code work again (thanks in advance). My code used to work on versions equal or less than 9.1 but when i included 10 or 10.1 i got the following exception.
> >
> > This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key
> >
> > I understood that it is related to the serialisation of objects. I tried to follow the POJO building directions in https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization
> > with no luck to make it work.
> >
> > my dataset contains a set of tuples as key and one array of GValues, this is a snapshot of the GValue class.
> >
> >
> > sealed trait GValue extends Serializable with Ordered[GValue]{
> > def compare(o : GValue) : Int = {
> > o match {
> > case GDouble(v) => this.asInstanceOf[GDouble].v compare v
> > case GString(v) => this.asInstanceOf[GString].v compare v
> > case GInt(v) => this.asInstanceOf[GInt].v compare v
> > case GNull() => 0
> > }
> > }
> > def equal(o : GValue) : Boolean = {
> > o match {
> > case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false }
> > case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false }
> > case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false }
> > case GNull() => o.isInstanceOf[GNull]
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains an integer
> > * @deprecated
> > * @param v
> > */
> > case class GInt(v: Int) extends GValue{
> > def this() = this(0)
> > override def toString() : String = {
> > v.toString
> > }
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GInt(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains a number as a @Double
> > * @param v number
> > */
> > case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{
> >
> > def this() = this(0.0)
> >
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GDouble(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains a @String
> > * @param v string
> > */
> > case class GString(v: String) extends GValue{
> > def this() = this(".")
> > override def toString() : String = {
> > v.toString
> > }
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GString(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > Regards,
> >
> > -----------------------------------------------------------------
> > Abdulrahman Kaitoua
> > -----------------------------------------------------------------
> > Ph.D. Candidate at Politecnico Di Milano
> >
> >
>