flink doesn't seem to serialize generated Avro pojo

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink doesn't seem to serialize generated Avro pojo

Bart van Deenen
Hi All

I have a simple avro file

{"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1",
 "type": "record",
 "name": "contactHistory",
 "fields": [
     {"name": "events",     "type": {"type":"array",            
     "items": "bytes"}},
     {"name": "krn",      "type": "string"}
 ]
}

I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via

java -jar avro_tools.jar
Version 1.7.7 of Apache Avro

java -jar bin/avro_tools.jar compile schema <avrofile>
generated/src/main/java

This pojo file doesn't seem to want to be serialized by Flink, when I
pack the compiled class file into a fat-jar job file.

When I pass a certain byte array into the avro deserializer, this works
fine in a regular Scala application, but when I do the same in a Flink
job it bombs with a typecast exception.

Scala code:

      val payload: AnyRef = kafkaEvent.payload()   // this calls the
      Avro deserializer: (gist: https://goo.gl/18UqJy)

      println("canonical name: " + payload.getClass.getCanonicalName)
      val chAvro =
      try {
        payload.asInstanceOf[contactHistory]
      } catch {
        case c:ClassCastException =>
          println(c.getMessage)
 
I get the following amazing error:

"canonical name: " +
com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory        
com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot
be cast to
com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory

I've verified the exact same code with the exact same byte-array input
(checked by printing a hexdump from the Flink job), and the code itself
is not the problem.

If I put a jar containing the class file from the Pojo in the Flink lib
directory, my Flink job works fine!

Any ideas how I can put my Pojo in the fat jar, because I don't want to
restart my Flink when I add new Avro schemas?

Thanks

Bart
Reply | Threaded
Open this post in threaded view
|

Re: flink doesn't seem to serialize generated Avro pojo

Stefan Richter
Hi,

this „amazing error“ message typically means that the class of the object instance you created was loaded by a different classloader than the one that loaded the class in the code that tries to cast it. A class in Java is
fully identified by the canonical classname AND the classloader that loaded it. This makes it possible that two classes with the same name and bytecode are not instance of each other. Unfortunately, I have no concrete idea why that happens in your case,
but maybe this info helps to track down the problem.

Best,
Stefan

> Am 22.06.2017 um 11:09 schrieb Bart van Deenen <[hidden email]>:
>
> Hi All
>
> I have a simple avro file
>
> {"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1",
> "type": "record",
> "name": "contactHistory",
> "fields": [
>     {"name": "events",     "type": {"type":"array",            
>     "items": "bytes"}},
>     {"name": "krn",      "type": "string"}
> ]
> }
>
> I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via
>
> java -jar avro_tools.jar
> Version 1.7.7 of Apache Avro
>
> java -jar bin/avro_tools.jar compile schema <avrofile>
> generated/src/main/java
>
> This pojo file doesn't seem to want to be serialized by Flink, when I
> pack the compiled class file into a fat-jar job file.
>
> When I pass a certain byte array into the avro deserializer, this works
> fine in a regular Scala application, but when I do the same in a Flink
> job it bombs with a typecast exception.
>
> Scala code:
>
>      val payload: AnyRef = kafkaEvent.payload()   // this calls the
>      Avro deserializer: (gist: https://goo.gl/18UqJy)
>
>      println("canonical name: " + payload.getClass.getCanonicalName)
>      val chAvro =
>      try {
>        payload.asInstanceOf[contactHistory]
>      } catch {
>        case c:ClassCastException =>
>          println(c.getMessage)
>
> I get the following amazing error:
>
> "canonical name: " +
> com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory        
> com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot
> be cast to
> com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory
>
> I've verified the exact same code with the exact same byte-array input
> (checked by printing a hexdump from the Flink job), and the code itself
> is not the problem.
>
> If I put a jar containing the class file from the Pojo in the Flink lib
> directory, my Flink job works fine!
>
> Any ideas how I can put my Pojo in the fat jar, because I don't want to
> restart my Flink when I add new Avro schemas?
>
> Thanks
>
> Bart

Reply | Threaded
Open this post in threaded view
|

Re: flink doesn't seem to serialize generated Avro pojo

Bart van Deenen
Thanks for the explanation, at least it's a start.

Bart

On Thu, Jun 22, 2017, at 11:20, Stefan Richter wrote:

> Hi,
>
> this „amazing error“ message typically means that the class of the object
> instance you created was loaded by a different classloader than the one
> that loaded the class in the code that tries to cast it. A class in Java
> is
> fully identified by the canonical classname AND the classloader that
> loaded it. This makes it possible that two classes with the same name and
> bytecode are not instance of each other. Unfortunately, I have no
> concrete idea why that happens in your case,
> but maybe this info helps to track down the problem.
>
> Best,
> Stefan
>
> > Am 22.06.2017 um 11:09 schrieb Bart van Deenen <[hidden email]>:
> >
> > Hi All
> >
> > I have a simple avro file
> >
> > {"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1",
> > "type": "record",
> > "name": "contactHistory",
> > "fields": [
> >     {"name": "events",     "type": {"type":"array",            
> >     "items": "bytes"}},
> >     {"name": "krn",      "type": "string"}
> > ]
> > }
> >
> > I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via
> >
> > java -jar avro_tools.jar
> > Version 1.7.7 of Apache Avro
> >
> > java -jar bin/avro_tools.jar compile schema <avrofile>
> > generated/src/main/java
> >
> > This pojo file doesn't seem to want to be serialized by Flink, when I
> > pack the compiled class file into a fat-jar job file.
> >
> > When I pass a certain byte array into the avro deserializer, this works
> > fine in a regular Scala application, but when I do the same in a Flink
> > job it bombs with a typecast exception.
> >
> > Scala code:
> >
> >      val payload: AnyRef = kafkaEvent.payload()   // this calls the
> >      Avro deserializer: (gist: https://goo.gl/18UqJy)
> >
> >      println("canonical name: " + payload.getClass.getCanonicalName)
> >      val chAvro =
> >      try {
> >        payload.asInstanceOf[contactHistory]
> >      } catch {
> >        case c:ClassCastException =>
> >          println(c.getMessage)
> >
> > I get the following amazing error:
> >
> > "canonical name: " +
> > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory        
> > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot
> > be cast to
> > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory
> >
> > I've verified the exact same code with the exact same byte-array input
> > (checked by printing a hexdump from the Flink job), and the code itself
> > is not the problem.
> >
> > If I put a jar containing the class file from the Pojo in the Flink lib
> > directory, my Flink job works fine!
> >
> > Any ideas how I can put my Pojo in the fat jar, because I don't want to
> > restart my Flink when I add new Avro schemas?
> >
> > Thanks
> >
> > Bart
>
Reply | Threaded
Open this post in threaded view
|

Re: flink doesn't seem to serialize generated Avro pojo

Bart van Deenen
In reply to this post by Stefan Richter
It turns out that the Flink documentation has a whole section about this
problem: https://goo.gl/DMs9Dx
Now let's find the solution!.

Bart

Reply | Threaded
Open this post in threaded view
|

Re: flink doesn't seem to serialize generated Avro pojo

Aljoscha Krettek
Hi Bart,

Is serialisation failing in a Flink job (for example when writing to Kafka) or just in the main() method when experimenting with serialisation?

Best,
Aljoscha

> On 26. Jun 2017, at 11:06, Bart van Deenen <[hidden email]> wrote:
>
> It turns out that the Flink documentation has a whole section about this
> problem: https://goo.gl/DMs9Dx
> Now let's find the solution!.
>
> Bart
>