Serialization and Deserialization of Avro messages stored in Kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization and Deserialization of Avro messages stored in Kafka

Filipe Couto
Hello,

I have a few topics that I want to read from Kafka, which consist mainly on a key value pair of: timestamp (key) and value (byte array).

The bite array doesn't really have a class to deserialize from, since the Avro Record we have comes from a "SELECT * FROM..." that selects several SQL tables and in each topic we have that table represented.

We're using a GenericRecord, and since we know the structure of the table via the name of the topic we know the column names, like this: genericRecord.get("COLUMN_NAME").toString()

Given this, we're now trying to read a Kafka topic using Flink, and we have this:

The environment is the StreamExecutionEnvironment and the properties are about the Kafka serialization and deserialization and Kafka and Zookeeper IP addresses.

class...

DataStream<Object> messageStream = environment
.addSource(new FlinkKafkaConsumer010<>(baseTopic, new MyDeserializationSchema(schema), properties));

messageStream.print();

try {
environment.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return false;
}
}

class MyDeserializationSchema<T> implements DeserializationSchema<T> {
private static final Logger log = LoggerFactory.getLogger(MyDeserializationSchema.class);

private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
private final Schema schema;
public MyDeserializationSchema(Schema schema) {
this.schema = schema;
}

@Override
public T deserialize(byte[] arg0) throws IOException {
log.info("Starting deserialization");
GenericRecord genericRecord;
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs
.toBinary(schema);
log.info(recordInjection.toString());
genericRecord = recordInjection.invert(arg0).get();
log.info(genericRecord.toString());
return (T) genericRecord;
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}

}

Executing this on our server generates the following:

[2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer (key.deserializer) (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09)

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumer09 is not serializable. The object probably contains or references non serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
        at com.i2s.analytics.flink.executors.LKTreatyExecutor.execute(LKTreatyExecutor.java:153)
        at com.i2s.analytics.flink.job.DependenciesConsumer.main(DependenciesConsumer.java:66)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
        ... 6 more


I can't understand why the logs refer to a  FlinkKafkaConsumer09 when we're using the  FlinkKafkaConsumer010 version.
And also, how can we deserialize to a GenericRecord so we can access the record fields like we're doing when we're just reading a Kafka topic without Flink.


Thanks in advance for any help that is given to us.

Reply | Threaded
Open this post in threaded view
|

Re: Serialization and Deserialization of Avro messages stored in Kafka

Gordon Weakliem
The 010 consumer extends 09, so I'd guess whatever code is reporting sees the FlinkKafkaConsumer010 as its superclass.

I've seen this error a bunch, and it's because MyDeserializationSchema isn't serializable, or likely one of its fields is not serializable, or one of the fields of its fields - you understand, everything in the object graph has to be serializable.

Probably the easiest way to understand that is to write a unit test to make sure that MyDeserializationSchema is serializable, essentially a test to make sure ObjectOutputStream.writeObject will work. That's a pretty useful test because you find out if a change to your MyDeserializationSchema will break the runtime during the test phase instead of waiting until you get to the deploy/run stage.


On Fri, Mar 2, 2018 at 10:42 AM, Filipe Couto <[hidden email]> wrote:
Hello,

I have a few topics that I want to read from Kafka, which consist mainly on a key value pair of: timestamp (key) and value (byte array).

The bite array doesn't really have a class to deserialize from, since the Avro Record we have comes from a "SELECT * FROM..." that selects several SQL tables and in each topic we have that table represented.

We're using a GenericRecord, and since we know the structure of the table via the name of the topic we know the column names, like this: genericRecord.get("COLUMN_NAME").toString()

Given this, we're now trying to read a Kafka topic using Flink, and we have this:

The environment is the StreamExecutionEnvironment and the properties are about the Kafka serialization and deserialization and Kafka and Zookeeper IP addresses.

class...

DataStream<Object> messageStream = environment
.addSource(new FlinkKafkaConsumer010<>(baseTopic, new MyDeserializationSchema(schema), properties));

messageStream.print();

try {
environment.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return false;
}
}

class MyDeserializationSchema<T> implements DeserializationSchema<T> {
private static final Logger log = LoggerFactory.getLogger(MyDeserializationSchema.class);

private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
private final Schema schema;
public MyDeserializationSchema(Schema schema) {
this.schema = schema;
}

@Override
public T deserialize(byte[] arg0) throws IOException {
log.info("Starting deserialization");
GenericRecord genericRecord;
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs
.toBinary(schema);
log.info(recordInjection.toString());
genericRecord = recordInjection.invert(arg0).get();
log.info(genericRecord.toString());
return (T) genericRecord;
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}

}

Executing this on our server generates the following:

[2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer (key.deserializer) (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09)

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumer09 is not serializable. The object probably contains or references non serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
        at com.i2s.analytics.flink.executors.LKTreatyExecutor.execute(LKTreatyExecutor.java:153)
        at com.i2s.analytics.flink.job.DependenciesConsumer.main(DependenciesConsumer.java:66)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
        ... 6 more


I can't understand why the logs refer to a  FlinkKafkaConsumer09 when we're using the  FlinkKafkaConsumer010 version.
And also, how can we deserialize to a GenericRecord so we can access the record fields like we're doing when we're just reading a Kafka topic without Flink.


Thanks in advance for any help that is given to us.




--
Img
  Gordon Weakliem|  Sr. Software Engineer
  303.493.5490  
  Boulder | NYC London         



CONFIDENTIALITY. This communication is intended only for the use of the intended recipient(s) and contains information that is privileged and confidential. As a recipient of this confidential and proprietary information, you are prohibited from distributing this information outside of sovrn. Further, if you are not the intended recipient, please note that any dissemination of this communication is prohibited. If you have received this communication in error, please erase all copies of the message, including all attachments, and please also notify the sender immediately. Thank you for your cooperation.
Reply | Threaded
Open this post in threaded view
|

Re: Serialization and Deserialization of Avro messages stored in Kafka

Tzu-Li Tai
Hi Filipe,

What Gordon mentioned is correct. Did you manage to fix the issue?

From your code snippet, it looks like that the `Schema` field may not be
serializable. Could you double check that?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/