Problems reading Parquet input from HDFS

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Problems reading Parquet input from HDFS

Lukas Kircher
Hello,

I am trying to read Parquet files from HDFS and having problems. I use Avro for schema. Here is a basic example:

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Job job = Job.getInstance();
HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new AvroParquetInputFormat(), Void.class,
Customer.class, job);
FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
"/tmp/tpchinput/01/customer_parquet"));
Schema projection = Schema.createRecord(Customer.class.getSimpleName(), null, null, false);
List<Schema.Field> fields = Arrays.asList(
new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, (Object) null)
);
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);

DataSet<Tuple2<Void, Customer>> dataset = env.createInput(hif);
dataset.print();
}
If I submit this to the job manager I get the following stack trace:

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at misc.Misc.main(Misc.java:29)

The problem is that I use the parquet-avro dependency (which provides AvroParquetInputFormat) in version 1.9.0 which relies on the avro dependency 1.8.0. The flink-core itself relies on the avro dependency in version 1.7.7. Jfyi the dependency tree looks like this:

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flink-experiments ---
[INFO] ...:1.0-SNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
[INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for conflict with 1.8.0)
[INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
[INFO] |     \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for duplicate)
[INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
[INFO]    \- org.apache.avro:avro:jar:1.8.0:compile

Fixing the above NoSuchMethodError just leads to further problems. Downgrading parquet-avro to an older version creates other conflicts as there is no version that uses avro 1.7.7 like Flink does.

Is there a way around this or can you point me to another approach to read Parquet data from HDFS? How do you normally go about this?

Thanks for your help,
Lukas



Reply | Threaded
Open this post in threaded view
|

Re: Problems reading Parquet input from HDFS

Jörn Franke
Why not use a parquet only format? Not sure why you need an avtoparquetformat.

On 24. Apr 2017, at 18:19, Lukas Kircher <[hidden email]> wrote:

Hello,

I am trying to read Parquet files from HDFS and having problems. I use Avro for schema. Here is a basic example:

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Job job = Job.getInstance();
HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new AvroParquetInputFormat(), Void.class,
Customer.class, job);
FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
"/tmp/tpchinput/01/customer_parquet"));
Schema projection = Schema.createRecord(Customer.class.getSimpleName(), null, null, false);
List<Schema.Field> fields = Arrays.asList(
new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, (Object) null)
);
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);

DataSet<Tuple2<Void, Customer>> dataset = env.createInput(hif);
dataset.print();
}
If I submit this to the job manager I get the following stack trace:

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at misc.Misc.main(Misc.java:29)

The problem is that I use the parquet-avro dependency (which provides AvroParquetInputFormat) in version 1.9.0 which relies on the avro dependency 1.8.0. The flink-core itself relies on the avro dependency in version 1.7.7. Jfyi the dependency tree looks like this:

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flink-experiments ---
[INFO] ...:1.0-SNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
[INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for conflict with 1.8.0)
[INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
[INFO] |     \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for duplicate)
[INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
[INFO]    \- org.apache.avro:avro:jar:1.8.0:compile

Fixing the above NoSuchMethodError just leads to further problems. Downgrading parquet-avro to an older version creates other conflicts as there is no version that uses avro 1.7.7 like Flink does.

Is there a way around this or can you point me to another approach to read Parquet data from HDFS? How do you normally go about this?

Thanks for your help,
Lukas



Reply | Threaded
Open this post in threaded view
|

Re: Problems reading Parquet input from HDFS

Flavio Pompermaier
I started from this guide:


Best,
Flavio 

On 24 Apr 2017 6:36 pm, "Jörn Franke" <[hidden email]> wrote:
Why not use a parquet only format? Not sure why you need an avtoparquetformat.

On 24. Apr 2017, at 18:19, Lukas Kircher <[hidden email]> wrote:

Hello,

I am trying to read Parquet files from HDFS and having problems. I use Avro for schema. Here is a basic example:

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Job job = Job.getInstance();
HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new AvroParquetInputFormat(), Void.class,
Customer.class, job);
FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
"/tmp/tpchinput/01/customer_parquet"));
Schema projection = Schema.createRecord(Customer.class.getSimpleName(), null, null, false);
List<Schema.Field> fields = Arrays.asList(
new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, (Object) null)
);
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);

DataSet<Tuple2<Void, Customer>> dataset = env.createInput(hif);
dataset.print();
}
If I submit this to the job manager I get the following stack trace:

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at misc.Misc.main(Misc.java:29)

The problem is that I use the parquet-avro dependency (which provides AvroParquetInputFormat) in version 1.9.0 which relies on the avro dependency 1.8.0. The flink-core itself relies on the avro dependency in version 1.7.7. Jfyi the dependency tree looks like this:

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flink-experiments ---
[INFO] ...:1.0-SNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
[INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for conflict with 1.8.0)
[INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
[INFO] |     \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for duplicate)
[INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
[INFO]    \- org.apache.avro:avro:jar:1.8.0:compile

Fixing the above NoSuchMethodError just leads to further problems. Downgrading parquet-avro to an older version creates other conflicts as there is no version that uses avro 1.7.7 like Flink does.

Is there a way around this or can you point me to another approach to read Parquet data from HDFS? How do you normally go about this?

Thanks for your help,
Lukas



Reply | Threaded
Open this post in threaded view
|

Re: Problems reading Parquet input from HDFS

Lukas Kircher
Thanks for your suggestions.

@Flavio
This is very similar to the code I use and yields basically the same problems. The examples are based on flink-1.0-SNAPSHOT and avro-1.7.6. which is more than three years old. Do you have a working setup with newer version of avro and flink?

@Jörn
I tried to do that but I can't see how to get around the AvroParquetInputFormat (see below). I can pass a schema for projection as a string but then I get a NullPointerException as there is no ReadSupport class available in ParquetInputFormat. There is a constructor to instantiate ParquetInputFormat with a class that extends ReadSupport but I haven't found a suitable one to pass to the constructor. Do you know of a way around this?


  public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      Job job = Job.getInstance();
      HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new ParquetInputFormat(), Void.class,
          Customer.class, job);
      FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
          "/tmp/tpchinput/01/customer_parquet"));
      job.getConfiguration().set("parquet.avro.projection", "{\"type\":\"record\",\"name\":\"Customer\",\"fields\":[{\"name\":\"c_custkey\",\"type\":\"int\"}]}");
      env.createInput(hif).print();
  }


I am pretty sure that I miss something very basic? Let me know if you need any additional information.

Thanks ...



On 24 Apr 2017, at 20:51, Flavio Pompermaier <[hidden email]> wrote:

I started from this guide:


Best,
Flavio 

On 24 Apr 2017 6:36 pm, "Jörn Franke" <[hidden email]> wrote:
Why not use a parquet only format? Not sure why you need an avtoparquetformat.

On 24. Apr 2017, at 18:19, Lukas Kircher <[hidden email]> wrote:

Hello,

I am trying to read Parquet files from HDFS and having problems. I use Avro for schema. Here is a basic example:

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Job job = Job.getInstance();
HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new AvroParquetInputFormat(), Void.class,
Customer.class, job);
FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
"/tmp/tpchinput/01/customer_parquet"));
Schema projection = Schema.createRecord(Customer.class.getSimpleName(), null, null, false);
List<Schema.Field> fields = Arrays.asList(
new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, (Object) null)
);
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);

DataSet<Tuple2<Void, Customer>> dataset = env.createInput(hif);
dataset.print();
}
If I submit this to the job manager I get the following stack trace:

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at misc.Misc.main(Misc.java:29)

The problem is that I use the parquet-avro dependency (which provides AvroParquetInputFormat) in version 1.9.0 which relies on the avro dependency 1.8.0. The flink-core itself relies on the avro dependency in version 1.7.7. Jfyi the dependency tree looks like this:

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flink-experiments ---
[INFO] ...:1.0-SNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
[INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for conflict with 1.8.0)
[INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
[INFO] |     \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for duplicate)
[INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
[INFO]    \- org.apache.avro:avro:jar:1.8.0:compile

Fixing the above NoSuchMethodError just leads to further problems. Downgrading parquet-avro to an older version creates other conflicts as there is no version that uses avro 1.7.7 like Flink does.

Is there a way around this or can you point me to another approach to read Parquet data from HDFS? How do you normally go about this?

Thanks for your help,
Lukas




Reply | Threaded
Open this post in threaded view
|

Re: Problems reading Parquet input from HDFS

Flavio Pompermaier
Hi Lukas,
a colleague of mine issued a PR to update the code of that example (https://github.com/FelixNeutatz/parquet-flinktacular/pull/2).

We updated avro to 1.8.1 and the example worked fine (we didn't test on the cluster yet).
Let me know if it does help..

Best,
Flavio

On Tue, Apr 25, 2017 at 1:53 PM, Lukas Kircher <[hidden email]> wrote:
Thanks for your suggestions.

@Flavio
This is very similar to the code I use and yields basically the same problems. The examples are based on flink-1.0-SNAPSHOT and avro-1.7.6. which is more than three years old. Do you have a working setup with newer version of avro and flink?

@Jörn
I tried to do that but I can't see how to get around the AvroParquetInputFormat (see below). I can pass a schema for projection as a string but then I get a NullPointerException as there is no ReadSupport class available in ParquetInputFormat. There is a constructor to instantiate ParquetInputFormat with a class that extends ReadSupport but I haven't found a suitable one to pass to the constructor. Do you know of a way around this?


  public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      Job job = Job.getInstance();
      HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new ParquetInputFormat(), Void.class,
          Customer.class, job);
      FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
          "/tmp/tpchinput/01/customer_parquet"));
      job.getConfiguration().set("parquet.avro.projection", "{\"type\":\"record\",\"name\":\"Customer\",\"fields\":[{\"name\":\"c_custkey\",\"type\":\"int\"}]}");
      env.createInput(hif).print();
  }


I am pretty sure that I miss something very basic? Let me know if you need any additional information.

Thanks ...



On 24 Apr 2017, at 20:51, Flavio Pompermaier <[hidden email]> wrote:

I started from this guide:


Best,
Flavio 

On 24 Apr 2017 6:36 pm, "Jörn Franke" <[hidden email]> wrote:
Why not use a parquet only format? Not sure why you need an avtoparquetformat.

On 24. Apr 2017, at 18:19, Lukas Kircher <[hidden email]> wrote:

Hello,

I am trying to read Parquet files from HDFS and having problems. I use Avro for schema. Here is a basic example:

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Job job = Job.getInstance();
HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new AvroParquetInputFormat(), Void.class,
Customer.class, job);
FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
"/tmp/tpchinput/01/customer_parquet"));
Schema projection = Schema.createRecord(Customer.class.getSimpleName(), null, null, false);
List<Schema.Field> fields = Arrays.asList(
new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, (Object) null)
);
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);

DataSet<Tuple2<Void, Customer>> dataset = env.createInput(hif);
dataset.print();
}
If I submit this to the job manager I get the following stack trace:

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at misc.Misc.main(Misc.java:29)

The problem is that I use the parquet-avro dependency (which provides AvroParquetInputFormat) in version 1.9.0 which relies on the avro dependency 1.8.0. The flink-core itself relies on the avro dependency in version 1.7.7. Jfyi the dependency tree looks like this:

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flink-experiments ---
[INFO] ...:1.0-SNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
[INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for conflict with 1.8.0)
[INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
[INFO] |     \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for duplicate)
[INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
[INFO]    \- org.apache.avro:avro:jar:1.8.0:compile

Fixing the above NoSuchMethodError just leads to further problems. Downgrading parquet-avro to an older version creates other conflicts as there is no version that uses avro 1.7.7 like Flink does.

Is there a way around this or can you point me to another approach to read Parquet data from HDFS? How do you normally go about this?

Thanks for your help,
Lukas





Reply | Threaded
Open this post in threaded view
|

Re: Problems reading Parquet input from HDFS

Lukas Kircher
Hi Flavio,

thanks for your help. With Flink 1.2.0 and avro 1.8.1 it works fine for me too as long as I run it from the IDE. As soon as I submit it as a job to the cluster I get the described dependency issues.

* If I use the Flink 1.2.0 binary and just add Flink as a Maven dependency to my project I get a NoSuchMethodError as the avro functionality I use is not present in avro 1.7.7 which Flink 1.2.0 relies on. As I use the Flink binary this is only detected at runtime. The reason is that locally (IDE) Maven chooses avro 1.8.1 but the binary doesn't know about this.

* If I base my complete project on the Flink source code and build Flink on my own (not use the binary) I run into several compilation errors from the start. If I try to bump up the avro dependency in Flink to a newer version several issues arise e.g. in flink-core that I cannot resolve on my own. This also means that if one uses avro 1.8.1 there are incompatibilities in the code at runtime but it breaks only if this code is accessed.

At the moment I think about switching to e.g. Protobuf but I'm not yet sure if this solves my issue.

Best,
Lukas


On 28 Apr 2017, at 18:38, Flavio Pompermaier <[hidden email]> wrote:

Hi Lukas,
a colleague of mine issued a PR to update the code of that example (https://github.com/FelixNeutatz/parquet-flinktacular/pull/2).

We updated avro to 1.8.1 and the example worked fine (we didn't test on the cluster yet).
Let me know if it does help..

Best,
Flavio

On Tue, Apr 25, 2017 at 1:53 PM, Lukas Kircher <[hidden email]> wrote:
Thanks for your suggestions.

@Flavio
This is very similar to the code I use and yields basically the same problems. The examples are based on flink-1.0-SNAPSHOT and avro-1.7.6. which is more than three years old. Do you have a working setup with newer version of avro and flink?

@Jörn
I tried to do that but I can't see how to get around the AvroParquetInputFormat (see below). I can pass a schema for projection as a string but then I get a NullPointerException as there is no ReadSupport class available in ParquetInputFormat. There is a constructor to instantiate ParquetInputFormat with a class that extends ReadSupport but I haven't found a suitable one to pass to the constructor. Do you know of a way around this?


  public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      Job job = Job.getInstance();
      HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new ParquetInputFormat(), Void.class,
          Customer.class, job);
      FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
          "/tmp/tpchinput/01/customer_parquet"));
      job.getConfiguration().set("parquet.avro.projection", "{\"type\":\"record\",\"name\":\"Customer\",\"fields\":[{\"name\":\"c_custkey\",\"type\":\"int\"}]}");
      env.createInput(hif).print();
  }


I am pretty sure that I miss something very basic? Let me know if you need any additional information.

Thanks ...



On 24 Apr 2017, at 20:51, Flavio Pompermaier <[hidden email]> wrote:

I started from this guide:


Best,
Flavio 

On 24 Apr 2017 6:36 pm, "Jörn Franke" <[hidden email]> wrote:
Why not use a parquet only format? Not sure why you need an avtoparquetformat.

On 24. Apr 2017, at 18:19, Lukas Kircher <[hidden email]> wrote:

Hello,

I am trying to read Parquet files from HDFS and having problems. I use Avro for schema. Here is a basic example:

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Job job = Job.getInstance();
HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new AvroParquetInputFormat(), Void.class,
Customer.class, job);
FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
"/tmp/tpchinput/01/customer_parquet"));
Schema projection = Schema.createRecord(Customer.class.getSimpleName(), null, null, false);
List<Schema.Field> fields = Arrays.asList(
new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, (Object) null)
);
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);

DataSet<Tuple2<Void, Customer>> dataset = env.createInput(hif);
dataset.print();
}
If I submit this to the job manager I get the following stack trace:

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
at misc.Misc.main(Misc.java:29)

The problem is that I use the parquet-avro dependency (which provides AvroParquetInputFormat) in version 1.9.0 which relies on the avro dependency 1.8.0. The flink-core itself relies on the avro dependency in version 1.7.7. Jfyi the dependency tree looks like this:

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flink-experiments ---
[INFO] ...:1.0-SNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
[INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for conflict with 1.8.0)
[INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
[INFO] |     \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for duplicate)
[INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
[INFO]    \- org.apache.avro:avro:jar:1.8.0:compile

Fixing the above NoSuchMethodError just leads to further problems. Downgrading parquet-avro to an older version creates other conflicts as there is no version that uses avro 1.7.7 like Flink does.

Is there a way around this or can you point me to another approach to read Parquet data from HDFS? How do you normally go about this?

Thanks for your help,
Lukas