problem with avro serialization

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

problem with avro serialization

Debasish Ghosh
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Tzu-Li (Gordon) Tai
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Debasish Ghosh
Hi Gordon -

I have been trying out Flink 1.8 only recently. But this problem looks to to have existed since a long time. It's related to the way Flink handles Avro serialization, which I guess has not changed in recent times.

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.


--
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Debasish Ghosh
In reply to this post by Tzu-Li (Gordon) Tai
Any update on this ? 

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.


--
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Debasish Ghosh
From https://stackoverflow.com/a/56104518  ..

AFAIK the only solution is to update Flink to use avro's non-reflection-based constructors in AvroInputFormat (compare).

Would love to know if there has been some plans towards fixing this issue ..

regards. 

On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh <[hidden email]> wrote:
Any update on this ? 

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.


--


--
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Aljoscha Krettek
Hi,

I cut a PR that should fix this issue for Avrohugger: https://github.com/apache/flink/pull/9565

Would you be able to build this and see if it solves your problem?

Best,
Aljoscha

On 30. Aug 2019, at 09:02, Debasish Ghosh <[hidden email]> wrote:

From https://stackoverflow.com/a/56104518  ..

AFAIK the only solution is to update Flink to use avro's non-reflection-based constructors in AvroInputFormat (compare).

Would love to know if there has been some plans towards fixing this issue ..

regards. 

On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh <[hidden email]> wrote:
Any update on this ? 

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.


--


--

Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Debasish Ghosh
Thanks a lot .. sure I can do a build with this PR and check.

regards.

On Fri, Aug 30, 2019 at 2:20 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I cut a PR that should fix this issue for Avrohugger: https://github.com/apache/flink/pull/9565

Would you be able to build this and see if it solves your problem?

Best,
Aljoscha

On 30. Aug 2019, at 09:02, Debasish Ghosh <[hidden email]> wrote:

From https://stackoverflow.com/a/56104518  ..

AFAIK the only solution is to update Flink to use avro's non-reflection-based constructors in AvroInputFormat (compare).

Would love to know if there has been some plans towards fixing this issue ..

regards. 

On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh <[hidden email]> wrote:
Any update on this ? 

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.


--


--



--
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Debasish Ghosh
Hello Aljoscha -

I made a comment on your PR (https://github.com/apache/flink/pull/9565/files#r319598469). With the suggested fix it runs fine .. Thanks.

regards.

On Fri, Aug 30, 2019 at 4:48 PM Debasish Ghosh <[hidden email]> wrote:
Thanks a lot .. sure I can do a build with this PR and check.

regards.

On Fri, Aug 30, 2019 at 2:20 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I cut a PR that should fix this issue for Avrohugger: https://github.com/apache/flink/pull/9565

Would you be able to build this and see if it solves your problem?

Best,
Aljoscha

On 30. Aug 2019, at 09:02, Debasish Ghosh <[hidden email]> wrote:

From https://stackoverflow.com/a/56104518  ..

AFAIK the only solution is to update Flink to use avro's non-reflection-based constructors in AvroInputFormat (compare).

Would love to know if there has been some plans towards fixing this issue ..

regards. 

On Thu, Aug 29, 2019 at 8:23 PM Debasish Ghosh <[hidden email]> wrote:
Any update on this ? 

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.


--


--



--


--