Example - Reading Avro Generic records

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Example - Reading Avro Generic records

Tarandeep Singh
Hi,

Can someone please point me to an example of creating DataSet using Avro Generic Records?

I tried this code -
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);

DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);

env.execute();
But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep
Reply | Threaded
Open this post in threaded view
|

Re: Example - Reading Avro Generic records

Sourigna Phetsarath
There is a way yet, but I am proposing to do one: https://issues.apache.org/jira/browse/FLINK-3691

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <[hidden email]> wrote:
Hi,

Can someone please point me to an example of creating DataSet using Avro Generic Records?

I tried this code -
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);

DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);

env.execute();
But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep



--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna


Reply | Threaded
Open this post in threaded view
|

Re: Example - Reading Avro Generic records

Sourigna Phetsarath
In reply to this post by Tarandeep Singh
Tarandeep,

There isn't a way yet, but I am proposing to do one: https://issues.apache.org/jira/browse/FLINK-3691

-Gna

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <[hidden email]> wrote:
Hi,

Can someone please point me to an example of creating DataSet using Avro Generic Records?

I tried this code -
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);

DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);

env.execute();
But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep



--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna


Reply | Threaded
Open this post in threaded view
|

Re: Example - Reading Avro Generic records

Tarandeep Singh
Thank you Gna for opening the ticket.

I looked into AvroInputFormat code and inspired by it I wrote a GenericAvroInputFormat. The code is awfully similar (and hence redundant) to original AvroInputFormat, so it is a good idea to modify AvroInputFormat in flink to support GenericRecord.

Anyways, I am pasting the code here for anyone who wants to use it (till your code is part of Flink stable release)-

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;

public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> {

private transient long end;
private transient Schema schema;
private transient FileReader<GenericRecord> fileReader;
private boolean reuseAvroValue = true;

private static final long serialVersionUID = 1L;

public GenericAvroInputFormat(Path filePath, Schema schema) {
super(filePath);
this.schema = schema;
}

public void setReuseAvroValue(boolean reuseAvroValue) {
this.reuseAvroValue = reuseAvroValue;
}

public void setUnsplittable(boolean unsplittable) {
this.unsplittable = unsplittable;
}

@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
fileReader = DataFileReader.openReader(sin, reader);
fileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();
}

@Override
public boolean reachedEnd() throws IOException {
return !fileReader.hasNext() || fileReader.pastSync(end);
}

@Override
public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException {
if (reachedEnd()) {
return null;
}

if (!reuseAvroValue) {
reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class);
}

reuseValue = fileReader.next(reuseValue);
return reuseValue;
}
}

Usage:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path inPath = new Path(args[0]);

Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc"));
DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
@Override
public Tuple2<Long,String> map(GenericRecord record) {
Long id = (Long) record.get("id");
String someString = record.get("somestring").toString();
return new Tuple2<>(id, someString);
}
}).writeAsText(args[1]);

env.execute();
}

-Tarandeep







On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <[hidden email]> wrote:
Tarandeep,

There isn't a way yet, but I am proposing to do one: https://issues.apache.org/jira/browse/FLINK-3691

-Gna

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <[hidden email]> wrote:
Hi,

Can someone please point me to an example of creating DataSet using Avro Generic Records?

I tried this code -
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);

DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);

env.execute();
But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep



--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: <a href="tel:212.402.4871" value="+12124024871" target="_blank">212.402.4871 // m: <a href="tel:917.373.7363" value="+19173737363" target="_blank">917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna



Reply | Threaded
Open this post in threaded view
|

Re: Example - Reading Avro Generic records

Sourigna Phetsarath
Tranadeep,

Thanks for pasting your code!  

I have a PR ready that extends AvroInputFormat and will submit it soon. 

Still waiting for the legal team at AOL to approve it. 

-Gna

On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <[hidden email]> wrote:
Thank you Gna for opening the ticket.

I looked into AvroInputFormat code and inspired by it I wrote a GenericAvroInputFormat. The code is awfully similar (and hence redundant) to original AvroInputFormat, so it is a good idea to modify AvroInputFormat in flink to support GenericRecord.

Anyways, I am pasting the code here for anyone who wants to use it (till your code is part of Flink stable release)-

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;

public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> {

private transient long end;
private transient Schema schema;
private transient FileReader<GenericRecord> fileReader;
private boolean reuseAvroValue = true;

private static final long serialVersionUID = 1L;

public GenericAvroInputFormat(Path filePath, Schema schema) {
super(filePath);
this.schema = schema;
}

public void setReuseAvroValue(boolean reuseAvroValue) {
this.reuseAvroValue = reuseAvroValue;
}

public void setUnsplittable(boolean unsplittable) {
this.unsplittable = unsplittable;
}

@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
fileReader = DataFileReader.openReader(sin, reader);
fileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();
}

@Override
public boolean reachedEnd() throws IOException {
return !fileReader.hasNext() || fileReader.pastSync(end);
}

@Override
public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException {
if (reachedEnd()) {
return null;
}

if (!reuseAvroValue) {
reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class);
}

reuseValue = fileReader.next(reuseValue);
return reuseValue;
}
}

Usage:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path inPath = new Path(args[0]);

Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc"));
DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
@Override
public Tuple2<Long,String> map(GenericRecord record) {
Long id = (Long) record.get("id");
String someString = record.get("somestring").toString();
return new Tuple2<>(id, someString);
}
}).writeAsText(args[1]);

env.execute();
}

-Tarandeep







On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <[hidden email]> wrote:
Tarandeep,

There isn't a way yet, but I am proposing to do one: https://issues.apache.org/jira/browse/FLINK-3691

-Gna

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <[hidden email]> wrote:
Hi,

Can someone please point me to an example of creating DataSet using Avro Generic Records?

I tried this code -
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);

DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);

env.execute();
But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep



--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: <a href="tel:212.402.4871" value="+12124024871" target="_blank">212.402.4871 // m: <a href="tel:917.373.7363" value="+19173737363" target="_blank">917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna






--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna


Reply | Threaded
Open this post in threaded view
|

Re: Example - Reading Avro Generic records

Sourigna Phetsarath
Tranadeep,

Also, in your code example, when reuseAvroValue is false the code will fail with this message:

java.lang.RuntimeException: The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The class is no proper class, it is either abstract, an interface, or a primitive type.
at org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:222)
at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:147)
at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:122)
at 


I had encountered this when I was write the PR.

-Gna

On Thu, Apr 7, 2016 at 11:08 AM, Sourigna Phetsarath <[hidden email]> wrote:
Tranadeep,

Thanks for pasting your code!  

I have a PR ready that extends AvroInputFormat and will submit it soon. 

Still waiting for the legal team at AOL to approve it. 

-Gna

On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <[hidden email]> wrote:
Thank you Gna for opening the ticket.

I looked into AvroInputFormat code and inspired by it I wrote a GenericAvroInputFormat. The code is awfully similar (and hence redundant) to original AvroInputFormat, so it is a good idea to modify AvroInputFormat in flink to support GenericRecord.

Anyways, I am pasting the code here for anyone who wants to use it (till your code is part of Flink stable release)-

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;

public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> implements ResultTypeQueryable<GenericRecord> {

private transient long end;
private transient Schema schema;
private transient FileReader<GenericRecord> fileReader;
private boolean reuseAvroValue = true;

private static final long serialVersionUID = 1L;

public GenericAvroInputFormat(Path filePath, Schema schema) {
super(filePath);
this.schema = schema;
}

public void setReuseAvroValue(boolean reuseAvroValue) {
this.reuseAvroValue = reuseAvroValue;
}

public void setUnsplittable(boolean unsplittable) {
this.unsplittable = unsplittable;
}

@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
SeekableInput sin = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
fileReader = DataFileReader.openReader(sin, reader);
fileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();
}

@Override
public boolean reachedEnd() throws IOException {
return !fileReader.hasNext() || fileReader.pastSync(end);
}

@Override
public GenericRecord nextRecord(GenericRecord reuseValue) throws IOException {
if (reachedEnd()) {
return null;
}

if (!reuseAvroValue) {
reuseValue = InstantiationUtil.instantiate(GenericRecord.class, Object.class);
}

reuseValue = fileReader.next(reuseValue);
return reuseValue;
}
}

Usage:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path inPath = new Path(args[0]);

Schema schema = new Schema.Parser().parse(new File("/path/to/schemafile.avsc"));
DataSet<GenericRecord> dataSet = env.createInput(new GenericAvroInputFormat(inPath, schema));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
@Override
public Tuple2<Long,String> map(GenericRecord record) {
Long id = (Long) record.get("id");
String someString = record.get("somestring").toString();
return new Tuple2<>(id, someString);
}
}).writeAsText(args[1]);

env.execute();
}

-Tarandeep







On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <[hidden email]> wrote:
Tarandeep,

There isn't a way yet, but I am proposing to do one: https://issues.apache.org/jira/browse/FLINK-3691

-Gna

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <[hidden email]> wrote:
Hi,

Can someone please point me to an example of creating DataSet using Avro Generic Records?

I tried this code -
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);

DataSet<GenericRecord> dataSet = env.createInput(new AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);

env.execute();
But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep



--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: <a href="tel:212.402.4871" value="+12124024871" target="_blank">212.402.4871 // m: <a href="tel:917.373.7363" value="+19173737363" target="_blank">917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna






--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: <a href="tel:212.402.4871" value="+12124024871" target="_blank">212.402.4871 // m: <a href="tel:917.373.7363" value="+19173737363" target="_blank">917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna





--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna