Serialization schema

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization schema

Mohit Anchlia
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema



Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Biao Liu
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema




Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Mohit Anchlia
I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <[hidden email]> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema





Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Tzu-Li (Gordon) Tai
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ([hidden email]) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <[hidden email]> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema





Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Mohit Anchlia
This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ([hidden email]) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <[hidden email]> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema






Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Tzu-Li (Gordon) Tai
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit reference to the enclosing outer class, and therefore serializing it will result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia ([hidden email]) wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ([hidden email]) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <[hidden email]> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema






Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Mohit Anchlia
But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit reference to the enclosing outer class, and therefore serializing it will result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia ([hidden email]) wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ([hidden email]) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <[hidden email]> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema







Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Tzu-Li (Gordon) Tai
Thanks for clarifying. 

From the looks of your exception:

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous inner class in `Tuple2Serializerr` is not serializable.

Could you check if that’s the case?


On February 24, 2017 at 3:10:58 PM, Mohit Anchlia ([hidden email]) wrote:

But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit reference to the enclosing outer class, and therefore serializing it will result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia ([hidden email]) wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ([hidden email]) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <[hidden email]> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema







Reply | Threaded
Open this post in threaded view
|

Re: Serialization schema

Mohit Anchlia
There was a private member variable that was not serializable and was not marked transient. Thanks for the pointer.

On Thu, Feb 23, 2017 at 11:44 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Thanks for clarifying. 

From the looks of your exception:

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous inner class in `Tuple2Serializerr` is not serializable.

Could you check if that’s the case?



On February 24, 2017 at 3:10:58 PM, Mohit Anchlia ([hidden email]) wrote:

But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit reference to the enclosing outer class, and therefore serializing it will result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia ([hidden email]) wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ([hidden email]) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <[hidden email]> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <[hidden email]>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema