http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Reading-from-AVRO-files-tp35850.html
Hi,
I need to continuously ingest AVRO files as they arrive.
Files are written by an S3 Sink Kafka Connect but S3 is not the point here. I started trying to ingest a static bunch of files from local fs first and I am having weird issues with AVRO deserialization.
I have to say, the records contain logical types, timestamps-ms and decimals
To keep it simple, I extracted the AVRO schema from the data files and used avro-maven-plugin to generate POJOs
I tried multiple combinations, all with no luck
1) Specific record generated with AVRO 1.8.2 plugin
Path in = new Path(sourceBasePath);
AvroInputFormat<AccountEntries> inputFormat = new AvroInputFormat<>(in, AccountEntries.class);
DataStream<AccountEntries> accountEntries = env
.readFile(inputFormat, sourceBasePath, FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
Result
java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime
(IIRC this is a known AVRO 1.8.2 issue)
2) Specific record generated with AVRO 1.9.2 plugin
Same code as above but AVRO POJOs are generated with AVRO 1.9.2
Result
org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant
3) Generic record
I am getting the Schema from the generated specific record, for convenience, but I am not using the generated POJO as record.
I also followed the suggestions in this Flink blog post, to explicitly specify the TypeInfo with returns(...) Path in = new Path(config.sourceFileSystemPath);
Schema schema = AccountEntries.getClassSchema();
AvroInputFormat<GenericRecord> inputFormat = new AvroInputFormat<>(in, GenericRecord.class);
DataStream<GenericRecord> accountEntries = env
.readFile(inputFormat, config.sourceFileSystemPath, FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
.returns(new GenericRecordAvroTypeInfo(schema));
Result
The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The class is not a proper class. It is either abstract, an interface, or a primitive type.
This looks like a bug.
I raised the
ticket and I will try to submit a fix, but still do not solve my problem as I am using a managed Flink I cannot update.
I cannot believe there is no workaround. I do not think I'm trying to do anything bizarre. Am I?
Any ideas?
Am I missing something obvious?
Cheers