Hi,
I tried to read avro (RecordSchema) data from Kafka using the flink-kafka connector but I have problems: Exception says at program startup: Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) I know RecordSchema is not serializable so It's ok but how to add serializer for RecordSchema? My Flink initialization: LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.addSource(new KafkaSource("localhost:2181", "neverwinter", new MyDeserializer())).print(); The deserializer: public class MyDeserializer implements DeserializationSchema<String>, SerializationSchema<String, byte[]>{ private static final long serialVersionUID = -8314881700393464119L; private static final EncoderFactory avroEncoderFactory = EncoderFactory.get(); private Schema _schema; public MyDeserializer(){ System.out.println("Creating MyDeserializer"); Schema.Parser parser = new Schema.Parser(); try { InputStream is = getClass().getResourceAsStream("/avro_schema.json"); if (is != null){ _schema = parser.parse(is); }else{ System.out.println("Unable to load schema file!"); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } } public TypeInformation<String> getProducedType() { return TypeExtractor.getForClass(String.class); } public String deserialize(byte[] message) { String data = null; try { DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema); Decoder decoder = DecoderFactory.get().binaryDecoder(message, null); GenericRecord result = reader.read(null, decoder); AvroKafkaData ad = new AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data"))); System.out.println("Read kafka data: " + data); data = ad.toString(); } catch (IOException e) { throw new RuntimeException(e); } return data; } public boolean isEndOfStream(String nextElement) { return false; } public byte[] serialize(String element) { System.out.println("Serializing element = " + element); byte[] data = null; try { GenericDatumWriter writer = new GenericDatumWriter(_schema); ByteArrayOutputStream stream = new ByteArrayOutputStream(); DatumReader<GenericRecord> reader=new GenericDatumReader<GenericRecord>(_schema); Decoder decoder=DecoderFactory.get().jsonDecoder(_schema, element); GenericRecord r=reader.read(null,decoder); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null); writer.write(r, binaryEncoder); binaryEncoder.flush(); IOUtils.closeStream(stream); data = stream.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); } return data; } } Unfortunately as I see only the constructor of MySerializer is called. Can somebody could suggest something? Thanks, Ferenc -- |
Hi, yes, the Avro Schema is not serializable. Can you make the "_schema" field "transient" and then lazily initialize the field when serialize()/deserialize() is called? That way, you initialize the schema on the cluster, so there is no need to transfer it over the network. I think Flink's own serialization stack should also be able to handle Avro types with Kafka. I'm trying to get the required tooling into Flink 0.10-SNAPSHOT. Let me know if you need more help. Best, Robert On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <[hidden email]> wrote:
|
The functions/sources have an open() method that is exactly intended for this type of initialization (constructing the Avro Schema). You can try and subclass the kafka source and override the open() method to initialize the schema there. Make sure you call super.open(). Greetings, Stephan On Sat, Aug 29, 2015 at 11:58 AM, Robert Metzger <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |