KafkaProducer with generic (Avro) serialization schema

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

KafkaProducer with generic (Avro) serialization schema

Wouter Zorgdrager
Dear reader,

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.
However this serialization schema is not serializable itself. Here is my code for this:

The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

  override def serialize(element: IN): Array[Byte] = {
    val byteArray = new ByteArrayOutputStream()
    val avroSer = AvroOutputStream.binary[IN](byteArray)
    avroSer.write(element)
    avroSer.flush()
    avroSer.close()

    return byteArray.toByteArray
  }
}

The job code:
case class Person(name : String, age : Int, address : Address)
case class Address(city : String, street : String)

class SimpleJob {

  @transient
  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

  def start() = {
    val testPerson = Person("Test", 100, Address("Test", "Test"))

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.
      fromCollection(Seq(testPerson)).
      addSink(createKafkaSink())

    env.execute("Flink sample job")
  }


  def createKafkaSink() : RichSinkFunction[Person] = {
    //set some properties
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.01:9092")
    properties.put("zookeeper.connect", "127.0.0.1:2181")

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
  }

}

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d is not serializable.

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 
Anyone knows a solution or workaround?

Thanks in advance!
Wouter
Reply | Threaded
Open this post in threaded view
|

RE: KafkaProducer with generic (Avro) serialization schema

Nortman, Bill

The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor.

 

From: Wouter Zorgdrager [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 7:17 AM
To: [hidden email]
Subject: KafkaProducer with generic (Avro) serialization schema

 

Dear reader,

 

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.

However this serialization schema is not serializable itself. Here is my code for this:

 

The serialization schema:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

 

  override def serialize(element: IN): Array[Byte] = {

    val byteArray = new ByteArrayOutputStream()

    val avroSer = AvroOutputStream.binary[IN](byteArray)

    avroSer.write(element)

    avroSer.flush()

    avroSer.close()

 

    return byteArray.toByteArray

  }

}

 

The job code:

case class Person(name : String, age : Int, address : Address)

case class Address(city : String, street : String)

 

class SimpleJob {

 

  @transient

  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

 

  def start() = {

    val testPerson = Person("Test", 100, Address("Test", "Test"))

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

 

    env.

      fromCollection(Seq(testPerson)).

      addSink(createKafkaSink())

 

    env.execute("Flink sample job")

  }

 

 

  def createKafkaSink() : RichSinkFunction[Person] = {

    //set some properties

    val properties = new Properties()

    properties.put("bootstrap.servers", "127.0.0.01:9092")

    properties.put("zookeeper.connect", "127.0.0.1:2181")

 

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)

  }

 

}

 

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object [hidden email] is not serializable.

 

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 

Anyone knows a solution or workaround?

 

Thanks in advance!

Wouter

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.

Reply | Threaded
Open this post in threaded view
|

Re: KafkaProducer with generic (Avro) serialization schema

Wouter Zorgdrager
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. 

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <[hidden email]>:

The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor.

 

From: Wouter Zorgdrager [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 7:17 AM
To: [hidden email]
Subject: KafkaProducer with generic (Avro) serialization schema

 

Dear reader,

 

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.

However this serialization schema is not serializable itself. Here is my code for this:

 

The serialization schema:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

 

  override def serialize(element: IN): Array[Byte] = {

    val byteArray = new ByteArrayOutputStream()

    val avroSer = AvroOutputStream.binary[IN](byteArray)

    avroSer.write(element)

    avroSer.flush()

    avroSer.close()

 

    return byteArray.toByteArray

  }

}

 

The job code:

case class Person(name : String, age : Int, address : Address)

case class Address(city : String, street : String)

 

class SimpleJob {

 

  @transient

  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

 

  def start() = {

    val testPerson = Person("Test", 100, Address("Test", "Test"))

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

 

    env.

      fromCollection(Seq(testPerson)).

      addSink(createKafkaSink())

 

    env.execute("Flink sample job")

  }

 

 

  def createKafkaSink() : RichSinkFunction[Person] = {

    //set some properties

    val properties = new Properties()

    properties.put("bootstrap.servers", "127.0.0.01:9092")

    properties.put("zookeeper.connect", "127.0.0.1:2181")

 

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)

  }

 

}

 

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object [hidden email] is not serializable.

 

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 

Anyone knows a solution or workaround?

 

Thanks in advance!

Wouter

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.

Reply | Threaded
Open this post in threaded view
|

Re: KafkaProducer with generic (Avro) serialization schema

Wouter Zorgdrager
So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue. 

I hope someone has some leads for me. 

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <[hidden email]>:
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. 

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <[hidden email]>:

The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor.

 

From: Wouter Zorgdrager [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 7:17 AM
To: [hidden email]
Subject: KafkaProducer with generic (Avro) serialization schema

 

Dear reader,

 

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.

However this serialization schema is not serializable itself. Here is my code for this:

 

The serialization schema:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

 

  override def serialize(element: IN): Array[Byte] = {

    val byteArray = new ByteArrayOutputStream()

    val avroSer = AvroOutputStream.binary[IN](byteArray)

    avroSer.write(element)

    avroSer.flush()

    avroSer.close()

 

    return byteArray.toByteArray

  }

}

 

The job code:

case class Person(name : String, age : Int, address : Address)

case class Address(city : String, street : String)

 

class SimpleJob {

 

  @transient

  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

 

  def start() = {

    val testPerson = Person("Test", 100, Address("Test", "Test"))

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

 

    env.

      fromCollection(Seq(testPerson)).

      addSink(createKafkaSink())

 

    env.execute("Flink sample job")

  }

 

 

  def createKafkaSink() : RichSinkFunction[Person] = {

    //set some properties

    val properties = new Properties()

    properties.put("bootstrap.servers", "127.0.0.01:9092")

    properties.put("zookeeper.connect", "127.0.0.1:2181")

 

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)

  }

 

}

 

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object [hidden email] is not serializable.

 

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 

Anyone knows a solution or workaround?

 

Thanks in advance!

Wouter

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.

Reply | Threaded
Open this post in threaded view
|

Re: KafkaProducer with generic (Avro) serialization schema

Piotr Nowojski
Hi,

My Scala knowledge is very limited (and my Scala's serialization knowledge is non existent), but one way or another you have to make your SerializationSchema serialisable. If indeed this is the problem, maybe a better place to ask this question is on Stack Overflow or some scala specific mailing list/board (unless someone else from the Flink's community can provide an answer to this problem)? 

Piotrek

On 1 May 2018, at 16:30, Wouter Zorgdrager <[hidden email]> wrote:

So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue. 

I hope someone has some leads for me. 

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <[hidden email]>:
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. 

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <[hidden email]>:

The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor.

 

From: Wouter Zorgdrager [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 7:17 AM
To: [hidden email]
Subject: KafkaProducer with generic (Avro) serialization schema

 

Dear reader,

 

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.

However this serialization schema is not serializable itself. Here is my code for this:

 

The serialization schema:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

 

  override def serialize(element: IN): Array[Byte] = {

    val byteArray = new ByteArrayOutputStream()

    val avroSer = AvroOutputStream.binary[IN](byteArray)

    avroSer.write(element)

    avroSer.flush()

    avroSer.close()

 

    return byteArray.toByteArray

  }

}

 

The job code:

case class Person(name : String, age : Int, address : Address)

case class Address(city : String, street : String)

 

class SimpleJob {

 

  @transient

  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

 

  def start() = {

    val testPerson = Person("Test", 100, Address("Test", "Test"))

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

 

    env.

      fromCollection(Seq(testPerson)).

      addSink(createKafkaSink())

 

    env.execute("Flink sample job")

  }

 

 

  def createKafkaSink() : RichSinkFunction[Person] = {

    //set some properties

    val properties = new Properties()

    properties.put("bootstrap.servers", "127.0.0.01:9092")

    properties.put("zookeeper.connect", "127.0.0.1:2181")

 

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)

  }

 

}

 

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object [hidden email] is not serializable.

 

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 

Anyone knows a solution or workaround?

 

Thanks in advance!

Wouter

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.


Reply | Threaded
Open this post in threaded view
|

Re: KafkaProducer with generic (Avro) serialization schema

Aljoscha Krettek
Hi,

Piotr is right, the SerializationSchema has to be serializable, which means that the implicit values passed on for SchemaFor[IN], FromRecord[IN], and ToRecord[IN] need to be serializable. Is there no way of making those serializable? As a workaround you could think about having a factory for those types that is serializable and have the factory as the context bound and then lazily create the SchemaFor, FromRecord, and ToRecord from that.

What do you think?

Aljoscha

On 2. May 2018, at 16:34, Piotr Nowojski <[hidden email]> wrote:

Hi,

My Scala knowledge is very limited (and my Scala's serialization knowledge is non existent), but one way or another you have to make your SerializationSchema serialisable. If indeed this is the problem, maybe a better place to ask this question is on Stack Overflow or some scala specific mailing list/board (unless someone else from the Flink's community can provide an answer to this problem)? 

Piotrek

On 1 May 2018, at 16:30, Wouter Zorgdrager <[hidden email]> wrote:

So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue. 

I hope someone has some leads for me. 

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <[hidden email]>:
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. 

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <[hidden email]>:

The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor.

 

From: Wouter Zorgdrager [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 7:17 AM
To: [hidden email]
Subject: KafkaProducer with generic (Avro) serialization schema

 

Dear reader,

 

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.

However this serialization schema is not serializable itself. Here is my code for this:

 

The serialization schema:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

 

  override def serialize(element: IN): Array[Byte] = {

    val byteArray = new ByteArrayOutputStream()

    val avroSer = AvroOutputStream.binary[IN](byteArray)

    avroSer.write(element)

    avroSer.flush()

    avroSer.close()

 

    return byteArray.toByteArray

  }

}

 

The job code:

case class Person(name : String, age : Int, address : Address)

case class Address(city : String, street : String)

 

class SimpleJob {

 

  @transient

  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

 

  def start() = {

    val testPerson = Person("Test", 100, Address("Test", "Test"))

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

 

    env.

      fromCollection(Seq(testPerson)).

      addSink(createKafkaSink())

 

    env.execute("Flink sample job")

  }

 

 

  def createKafkaSink() : RichSinkFunction[Person] = {

    //set some properties

    val properties = new Properties()

    properties.put("bootstrap.servers", "127.0.0.01:9092")

    properties.put("zookeeper.connect", "127.0.0.1:2181")

 

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)

  }

 

}

 

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object [hidden email] is not serializable.

 

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 

Anyone knows a solution or workaround?

 

Thanks in advance!

Wouter

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.



Reply | Threaded
Open this post in threaded view
|

Re: KafkaProducer with generic (Avro) serialization schema

Fabian Hueske-2
In reply to this post by Piotr Nowojski
Hi Wouter,

you can try to make the SerializationSchema serializable by overriding Java's serialization methods writeObject() and readObject() similar as Flink's AvroRowSerializationSchema [1] does.


2018-05-02 16:34 GMT+02:00 Piotr Nowojski <[hidden email]>:
Hi,

My Scala knowledge is very limited (and my Scala's serialization knowledge is non existent), but one way or another you have to make your SerializationSchema serialisable. If indeed this is the problem, maybe a better place to ask this question is on Stack Overflow or some scala specific mailing list/board (unless someone else from the Flink's community can provide an answer to this problem)? 

Piotrek

On 1 May 2018, at 16:30, Wouter Zorgdrager <[hidden email]> wrote:

So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue. 

I hope someone has some leads for me. 

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <[hidden email]>:
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. 

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <[hidden email]>:

The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor.

 

From: Wouter Zorgdrager [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 7:17 AM
To: [hidden email]
Subject: KafkaProducer with generic (Avro) serialization schema

 

Dear reader,

 

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.

However this serialization schema is not serializable itself. Here is my code for this:

 

The serialization schema:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

 

  override def serialize(element: IN): Array[Byte] = {

    val byteArray = new ByteArrayOutputStream()

    val avroSer = AvroOutputStream.binary[IN](byteArray)

    avroSer.write(element)

    avroSer.flush()

    avroSer.close()

 

    return byteArray.toByteArray

  }

}

 

The job code:

case class Person(name : String, age : Int, address : Address)

case class Address(city : String, street : String)

 

class SimpleJob {

 

  @transient

  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

 

  def start() = {

    val testPerson = Person("Test", 100, Address("Test", "Test"))

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

 

    env.

      fromCollection(Seq(testPerson)).

      addSink(createKafkaSink())

 

    env.execute("Flink sample job")

  }

 

 

  def createKafkaSink() : RichSinkFunction[Person] = {

    //set some properties

    val properties = new Properties()

    properties.put("bootstrap.servers", "127.0.0.01:9092")

    properties.put("zookeeper.connect", "127.0.0.1:2181")

 

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)

  }

 

}

 

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object [hidden email] is not serializable.

 

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 

Anyone knows a solution or workaround?

 

Thanks in advance!

Wouter

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.



Reply | Threaded
Open this post in threaded view
|

Re: KafkaProducer with generic (Avro) serialization schema

Wouter Zorgdrager
Hi, 

Thanks for the suggestions.

Unfortunately I cannot make FromRecord/ForRecord/SchemaFor serializable, since those classes are out of my control. I use those from the avro4s library (https://github.com/sksamuel/avro4s). The problem here, especially with the deserializer is that I need to convert an Avro 'GenericRecord' to a Scala case class. Avro is written in Java, so thats a bit problematic and therefore I need to Avro4s library. Avro4s tries to verify on compile-time if the generic is actually convertible from/to a generic record, that is why I need those context bounds. 

As for @Aljoscha's workaround, I don't understand how this would solve it? Because doesn't that just move the problem? If I create a factory, I still need the generic (with context bounds) I specify at my KafkaConsumer/Deserialization schema.

@Fabian I'm not sure if I understand your proposal. I still need the context bounds for those compile-time macro's of Avro4s. 

Once again, thanks for your help so far!

Regards,
Wouter



Op wo 2 mei 2018 om 16:48 schreef Fabian Hueske <[hidden email]>:
Hi Wouter,

you can try to make the SerializationSchema serializable by overriding Java's serialization methods writeObject() and readObject() similar as Flink's AvroRowSerializationSchema [1] does.


2018-05-02 16:34 GMT+02:00 Piotr Nowojski <[hidden email]>:
Hi,

My Scala knowledge is very limited (and my Scala's serialization knowledge is non existent), but one way or another you have to make your SerializationSchema serialisable. If indeed this is the problem, maybe a better place to ask this question is on Stack Overflow or some scala specific mailing list/board (unless someone else from the Flink's community can provide an answer to this problem)? 

Piotrek

On 1 May 2018, at 16:30, Wouter Zorgdrager <[hidden email]> wrote:

So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue. 

I hope someone has some leads for me. 

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <[hidden email]>:
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. 

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <[hidden email]>:

The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor.

 

From: Wouter Zorgdrager [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 7:17 AM
To: [hidden email]
Subject: KafkaProducer with generic (Avro) serialization schema

 

Dear reader,

 

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.

However this serialization schema is not serializable itself. Here is my code for this:

 

The serialization schema:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

 

  override def serialize(element: IN): Array[Byte] = {

    val byteArray = new ByteArrayOutputStream()

    val avroSer = AvroOutputStream.binary[IN](byteArray)

    avroSer.write(element)

    avroSer.flush()

    avroSer.close()

 

    return byteArray.toByteArray

  }

}

 

The job code:

case class Person(name : String, age : Int, address : Address)

case class Address(city : String, street : String)

 

class SimpleJob {

 

  @transient

  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

 

  def start() = {

    val testPerson = Person("Test", 100, Address("Test", "Test"))

 

    val env = StreamExecutionEnvironment.getExecutionEnvironment

 

    env.

      fromCollection(Seq(testPerson)).

      addSink(createKafkaSink())

 

    env.execute("Flink sample job")

  }

 

 

  def createKafkaSink() : RichSinkFunction[Person] = {

    //set some properties

    val properties = new Properties()

    properties.put("bootstrap.servers", "127.0.0.01:9092")

    properties.put("zookeeper.connect", "127.0.0.1:2181")

 

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)

  }

 

}

 

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object [hidden email] is not serializable.

 

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 

Anyone knows a solution or workaround?

 

Thanks in advance!

Wouter

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.