Parquet data stream group converter error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Parquet data stream group converter error

Jesse Lord

I am trying to read a parquet file into a datastream and then register that stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS EMR. I am using flink version 1.10.0 with EMR 5.30.

 

I am getting the following error:

 

Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null

        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)

Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"

        at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)

        at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)

        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)

        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)

        at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)

        at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)

        at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)

 

Below is a snippet of code that shows how I am trying to read the parquet file:

 

    String filePath = "hdfs:///path/to/single/file.parquet";

 

    ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration()));

    MessageType schema = reader.getFooter().getFileMetaData().getSchema();

 

    String parquetPath = "hdfs:///path/to/parquet/directory”;

 

    DataStream<Row> parquetStream = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(parquetPath), schema), parquetPath);

 

    Table parquetTable = tEnv.fromDataStream(parquetStream);

    tEnv.createTemporaryView("isession", parquetTable);

 

Thanks,

Jesse

Reply | Threaded
Open this post in threaded view
|

Re: Parquet data stream group converter error

r_khachatryan
Hi,

> MessageType schema = reader.getFooter().getFileMetaData().getSchema();
The first thing I'd suggest is to verify that the file contains a valid schema and can be read by some other program, e.g. parquet-tools schema or cat [1].

Regards,
Roman


On Thu, Jul 2, 2020 at 11:36 PM Jesse Lord <[hidden email]> wrote:

I am trying to read a parquet file into a datastream and then register that stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS EMR. I am using flink version 1.10.0 with EMR 5.30.

 

I am getting the following error:

 

Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null

        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)

Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"

        at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)

        at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)

        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)

        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)

        at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)

        at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)

        at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)

 

Below is a snippet of code that shows how I am trying to read the parquet file:

 

    String filePath = "hdfs:///path/to/single/file.parquet";

 

    ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration()));

    MessageType schema = reader.getFooter().getFileMetaData().getSchema();

 

    String parquetPath = "hdfs:///path/to/parquet/directory”;

 

    DataStream<Row> parquetStream = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(parquetPath), schema), parquetPath);

 

    Table parquetTable = tEnv.fromDataStream(parquetStream);

    tEnv.createTemporaryView("isession", parquetTable);

 

Thanks,

Jesse

Reply | Threaded
Open this post in threaded view
|

Re: Parquet data stream group converter error

Jesse Lord
I should have mentioned that I was able to read the same file in the batch ParquetTableSource. It is only when reading it in a stream that I encounter this error.

- Jesse

From: Khachatryan Roman <[hidden email]>
Sent: Friday, July 3, 2020 12:08:51 AM
To: Jesse Lord <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Parquet data stream group converter error
 
Hi,

> MessageType schema = reader.getFooter().getFileMetaData().getSchema();
The first thing I'd suggest is to verify that the file contains a valid schema and can be read by some other program, e.g. parquet-tools schema or cat [1].

Regards,
Roman


On Thu, Jul 2, 2020 at 11:36 PM Jesse Lord <[hidden email]> wrote:

I am trying to read a parquet file into a datastream and then register that stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS EMR. I am using flink version 1.10.0 with EMR 5.30.

 

I am getting the following error:

 

Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null

        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)

Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"

        at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)

        at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)

        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)

        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)

        at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)

        at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)

        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)

        at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)

        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)

 

Below is a snippet of code that shows how I am trying to read the parquet file:

 

    String filePath = "hdfs:///path/to/single/file.parquet";

 

    ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(filePath), new Configuration()));

    MessageType schema = reader.getFooter().getFileMetaData().getSchema();

 

    String parquetPath = "hdfs:///path/to/parquet/directory”;

 

    DataStream<Row> parquetStream = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(parquetPath), schema), parquetPath);

 

    Table parquetTable = tEnv.fromDataStream(parquetStream);

    tEnv.createTemporaryView("isession", parquetTable);

 

Thanks,

Jesse