Hello,
I have a few topics that I want to read from Kafka, which consist mainly on a key value pair of: timestamp (key) and value (byte array). The bite array doesn't really have a class to deserialize from, since the Avro Record we have comes from a "SELECT * FROM..." that selects several SQL tables and in each topic we have that table represented. We're using a GenericRecord, and since we know the structure of the table via the name of the topic we know the column names, like this: genericRecord.get("COLUMN_NAME").toString() Given this, we're now trying to read a Kafka topic using Flink, and we have this: The environment is the StreamExecutionEnvironment and the properties are about the Kafka serialization and deserialization and Kafka and Zookeeper IP addresses. class... DataStream<Object> messageStream = environment .addSource(new FlinkKafkaConsumer010<>(baseTopic, new MyDeserializationSchema(schema), properties)); messageStream.print(); try { environment.execute(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } } class MyDeserializationSchema<T> implements DeserializationSchema<T> { private static final Logger log = LoggerFactory.getLogger(MyDeserializationSchema.class); private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class; private final Schema schema; public MyDeserializationSchema(Schema schema) { this.schema = schema; } @Override public T deserialize(byte[] arg0) throws IOException { log.info("Starting deserialization"); GenericRecord genericRecord; Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs .toBinary(schema); log.info(recordInjection.toString()); genericRecord = recordInjection.invert(arg0).get(); log.info(genericRecord.toString()); return (T) genericRecord; } @Override public boolean isEndOfStream(T nextElement) { return false; } @Override public TypeInformation<T> getProducedType() { return TypeExtractor.getForClass(avrotype); } } Executing this on our server generates the following: [2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer (key.deserializer) (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09) Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumer09 is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386) at com.i2s.analytics.flink.executors.LKTreatyExecutor.execute(LKTreatyExecutor.java:153) at com.i2s.analytics.flink.job.DependenciesConsumer.main(DependenciesConsumer.java:66) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 6 more I can't understand why the logs refer to a
FlinkKafkaConsumer09 when we're using the
FlinkKafkaConsumer010
version. And also, how can we deserialize to a GenericRecord so we can access the record fields like we're doing when we're just reading a Kafka topic without Flink. Thanks in advance for any help that is given to us. |
The 010 consumer extends 09, so I'd guess whatever code is reporting sees the FlinkKafkaConsumer010 as its superclass. I've seen this error a bunch, and it's because MyDeserializationSchema isn't serializable, or likely one of its fields is not serializable, or one of the fields of its fields - you understand, everything in the object graph has to be serializable. Probably the easiest way to understand that is to write a unit test to make sure that MyDeserializationSchema is serializable, essentially a test to make sure ObjectOutputStream. On Fri, Mar 2, 2018 at 10:42 AM, Filipe Couto <[hidden email]> wrote:
CONFIDENTIALITY. This communication is intended only for the use of the intended recipient(s) and contains information that is privileged and confidential. As a recipient of this confidential and proprietary information, you are prohibited from distributing this information outside of sovrn. Further, if you are not the intended recipient, please note that any dissemination of this communication is prohibited. If you have received this communication in error, please erase all copies of the message, including all attachments, and please also notify the sender immediately. Thank you for your cooperation. |
Hi Filipe,
What Gordon mentioned is correct. Did you manage to fix the issue? From your code snippet, it looks like that the `Schema` field may not be serializable. Could you double check that? Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |