Reading from AVRO files

Posted by Lorenzo Nicora on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Reading-from-AVRO-files-tp35850.html

Hi,

I need to continuously ingest AVRO files as they arrive. 
Files are written by an S3 Sink Kafka Connect but S3 is not the point here. I started trying to ingest a static bunch of files from local fs first and I am having weird issues with AVRO deserialization.

I have to say, the records contain logical types, timestamps-ms and decimals

To keep it simple, I extracted the AVRO schema from the data files and used avro-maven-plugin to generate POJOs
I tried multiple combinations, all with no luck

1) Specific record generated with AVRO 1.8.2 plugin
Path in = new Path(sourceBasePath);
AvroInputFormat<AccountEntries> inputFormat = new AvroInputFormat<>(in, AccountEntries.class);
DataStream<AccountEntries> accountEntries = env
.readFile(inputFormat, sourceBasePath, FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
Result
java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime 
(IIRC this is a known AVRO 1.8.2 issue)


2) Specific record generated with AVRO 1.9.2 plugin
Same code as above but AVRO POJOs are generated with AVRO 1.9.2

Result
org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant


3) Generic record
I am getting the Schema from the generated specific record, for convenience, but I am not using the generated POJO as record.
I also followed the suggestions in this Flink blog post, to explicitly specify the TypeInfo with returns(...)
Path in = new Path(config.sourceFileSystemPath);
Schema schema = AccountEntries.getClassSchema();
AvroInputFormat<GenericRecord> inputFormat = new AvroInputFormat<>(in, GenericRecord.class);
DataStream<GenericRecord> accountEntries = env
.readFile(inputFormat, config.sourceFileSystemPath, FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
.returns(new GenericRecordAvroTypeInfo(schema));

Result
The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The class is not a proper class. It is either abstract, an interface, or a primitive type.

This looks like a bug. 
I raised the ticket and I will try to submit a fix, but still do not solve my problem as I am using a managed Flink I cannot update.
I cannot believe there is no workaround. I do not think I'm trying to do anything bizarre. Am I?

Any ideas?
Am I missing something obvious?

Cheers
Lorenzo