Hi All
I have a simple avro file {"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1", "type": "record", "name": "contactHistory", "fields": [ {"name": "events", "type": {"type":"array", "items": "bytes"}}, {"name": "krn", "type": "string"} ] } I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via java -jar avro_tools.jar Version 1.7.7 of Apache Avro java -jar bin/avro_tools.jar compile schema <avrofile> generated/src/main/java This pojo file doesn't seem to want to be serialized by Flink, when I pack the compiled class file into a fat-jar job file. When I pass a certain byte array into the avro deserializer, this works fine in a regular Scala application, but when I do the same in a Flink job it bombs with a typecast exception. Scala code: val payload: AnyRef = kafkaEvent.payload() // this calls the Avro deserializer: (gist: https://goo.gl/18UqJy) println("canonical name: " + payload.getClass.getCanonicalName) val chAvro = try { payload.asInstanceOf[contactHistory] } catch { case c:ClassCastException => println(c.getMessage) I get the following amazing error: "canonical name: " + com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot be cast to com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory I've verified the exact same code with the exact same byte-array input (checked by printing a hexdump from the Flink job), and the code itself is not the problem. If I put a jar containing the class file from the Pojo in the Flink lib directory, my Flink job works fine! Any ideas how I can put my Pojo in the fat jar, because I don't want to restart my Flink when I add new Avro schemas? Thanks Bart |
Hi,
this „amazing error“ message typically means that the class of the object instance you created was loaded by a different classloader than the one that loaded the class in the code that tries to cast it. A class in Java is fully identified by the canonical classname AND the classloader that loaded it. This makes it possible that two classes with the same name and bytecode are not instance of each other. Unfortunately, I have no concrete idea why that happens in your case, but maybe this info helps to track down the problem. Best, Stefan > Am 22.06.2017 um 11:09 schrieb Bart van Deenen <[hidden email]>: > > Hi All > > I have a simple avro file > > {"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1", > "type": "record", > "name": "contactHistory", > "fields": [ > {"name": "events", "type": {"type":"array", > "items": "bytes"}}, > {"name": "krn", "type": "string"} > ] > } > > I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via > > java -jar avro_tools.jar > Version 1.7.7 of Apache Avro > > java -jar bin/avro_tools.jar compile schema <avrofile> > generated/src/main/java > > This pojo file doesn't seem to want to be serialized by Flink, when I > pack the compiled class file into a fat-jar job file. > > When I pass a certain byte array into the avro deserializer, this works > fine in a regular Scala application, but when I do the same in a Flink > job it bombs with a typecast exception. > > Scala code: > > val payload: AnyRef = kafkaEvent.payload() // this calls the > Avro deserializer: (gist: https://goo.gl/18UqJy) > > println("canonical name: " + payload.getClass.getCanonicalName) > val chAvro = > try { > payload.asInstanceOf[contactHistory] > } catch { > case c:ClassCastException => > println(c.getMessage) > > I get the following amazing error: > > "canonical name: " + > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot > be cast to > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory > > I've verified the exact same code with the exact same byte-array input > (checked by printing a hexdump from the Flink job), and the code itself > is not the problem. > > If I put a jar containing the class file from the Pojo in the Flink lib > directory, my Flink job works fine! > > Any ideas how I can put my Pojo in the fat jar, because I don't want to > restart my Flink when I add new Avro schemas? > > Thanks > > Bart |
Thanks for the explanation, at least it's a start.
Bart On Thu, Jun 22, 2017, at 11:20, Stefan Richter wrote: > Hi, > > this „amazing error“ message typically means that the class of the object > instance you created was loaded by a different classloader than the one > that loaded the class in the code that tries to cast it. A class in Java > is > fully identified by the canonical classname AND the classloader that > loaded it. This makes it possible that two classes with the same name and > bytecode are not instance of each other. Unfortunately, I have no > concrete idea why that happens in your case, > but maybe this info helps to track down the problem. > > Best, > Stefan > > > Am 22.06.2017 um 11:09 schrieb Bart van Deenen <[hidden email]>: > > > > Hi All > > > > I have a simple avro file > > > > {"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1", > > "type": "record", > > "name": "contactHistory", > > "fields": [ > > {"name": "events", "type": {"type":"array", > > "items": "bytes"}}, > > {"name": "krn", "type": "string"} > > ] > > } > > > > I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via > > > > java -jar avro_tools.jar > > Version 1.7.7 of Apache Avro > > > > java -jar bin/avro_tools.jar compile schema <avrofile> > > generated/src/main/java > > > > This pojo file doesn't seem to want to be serialized by Flink, when I > > pack the compiled class file into a fat-jar job file. > > > > When I pass a certain byte array into the avro deserializer, this works > > fine in a regular Scala application, but when I do the same in a Flink > > job it bombs with a typecast exception. > > > > Scala code: > > > > val payload: AnyRef = kafkaEvent.payload() // this calls the > > Avro deserializer: (gist: https://goo.gl/18UqJy) > > > > println("canonical name: " + payload.getClass.getCanonicalName) > > val chAvro = > > try { > > payload.asInstanceOf[contactHistory] > > } catch { > > case c:ClassCastException => > > println(c.getMessage) > > > > I get the following amazing error: > > > > "canonical name: " + > > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory > > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot > > be cast to > > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory > > > > I've verified the exact same code with the exact same byte-array input > > (checked by printing a hexdump from the Flink job), and the code itself > > is not the problem. > > > > If I put a jar containing the class file from the Pojo in the Flink lib > > directory, my Flink job works fine! > > > > Any ideas how I can put my Pojo in the fat jar, because I don't want to > > restart my Flink when I add new Avro schemas? > > > > Thanks > > > > Bart > |
In reply to this post by Stefan Richter
It turns out that the Flink documentation has a whole section about this
problem: https://goo.gl/DMs9Dx Now let's find the solution!. Bart |
Hi Bart,
Is serialisation failing in a Flink job (for example when writing to Kafka) or just in the main() method when experimenting with serialisation? Best, Aljoscha > On 26. Jun 2017, at 11:06, Bart van Deenen <[hidden email]> wrote: > > It turns out that the Flink documentation has a whole section about this > problem: https://goo.gl/DMs9Dx > Now let's find the solution!. > > Bart > |
Free forum by Nabble | Edit this page |