Re: Question about processing a 3-level List data type in parquet

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Question about processing a 3-level List data type in parquet

Jingsong Li
Hi Naehee, sorry for the late reply.

I think you are right, there are bugs here. We didn't think about nested structures very well before.

Now we mainly focus on the new BulkFormat implementation, which we need to consider when implementing the new ParquetBulkFormat.

Best,
Jingsong

On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <[hidden email]> wrote:
Hi Jingsong,

I am forwarding the email below to you, thinking you will have a good idea about my questions below. I'd appreciate it if you give your thoughts.

Thanks,
Naehee


---------- Forwarded message ---------
From: Naehee Kim <[hidden email]>
Date: Thu, Oct 29, 2020 at 4:38 PM
Subject: Question about processing a 3-level List data type in parquet
To: <[hidden email]>


Hi Flink Dev Community,

I've found RowConverter.java in flink-parquet module doesn't support reading a 3-level list type in parquet though it is able to process a 2-level list type. 

3-level 
optional group my_list (LIST) {
repeated group element {
required binary str (UTF8);
};
}

  2-level
optional group my_list (LIST) {
repeated int32 element;
}

The parquet file I am testing with was written by Spark job and it has a 3-level list type. When I try to process the parquet file, it runs into 'java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"' error. 

I've tested with Flink 1.9 and checked RowConverter.java still remains the same in v1.11. To process a 3-level list, I think RowConverter.java should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if my understanding is correct and if you have any plan to support a 3-level List datatype in parquet.

For your reference, here are code snippet along with stack trace.
MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema);
ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema);
DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo);
-- stack trace
Job execution failed.
org.apache.flink.runtime.client.JobExecutionException:
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
Thanks,
Naehee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Question about processing a 3-level List data type in parquet

nkim
Hi Jingsong,

Thanks for the feedback. Can you let me know the concept and timeline of BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?

Our use case is for backfill to process parquet files in case of any data issue is found in the normal processing of kafka input. Thus, we want to make a job to easily switch kafka input and parquet file input and vice versa. Wonder if ParquetBulkFormat can fit in our use case.

Best,
Naehee

On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li <[hidden email]> wrote:
Hi Naehee, sorry for the late reply.

I think you are right, there are bugs here. We didn't think about nested structures very well before.

Now we mainly focus on the new BulkFormat implementation, which we need to consider when implementing the new ParquetBulkFormat.

Best,
Jingsong

On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <[hidden email]> wrote:
Hi Jingsong,

I am forwarding the email below to you, thinking you will have a good idea about my questions below. I'd appreciate it if you give your thoughts.

Thanks,
Naehee


---------- Forwarded message ---------
From: Naehee Kim <[hidden email]>
Date: Thu, Oct 29, 2020 at 4:38 PM
Subject: Question about processing a 3-level List data type in parquet
To: <[hidden email]>


Hi Flink Dev Community,

I've found RowConverter.java in flink-parquet module doesn't support reading a 3-level list type in parquet though it is able to process a 2-level list type. 

3-level 
optional group my_list (LIST) {
repeated group element {
required binary str (UTF8);
};
}

  2-level
optional group my_list (LIST) {
repeated int32 element;
}

The parquet file I am testing with was written by Spark job and it has a 3-level list type. When I try to process the parquet file, it runs into 'java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"' error. 

I've tested with Flink 1.9 and checked RowConverter.java still remains the same in v1.11. To process a 3-level list, I think RowConverter.java should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if my understanding is correct and if you have any plan to support a 3-level List datatype in parquet.

For your reference, here are code snippet along with stack trace.
MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema);
ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema);
DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo);
-- stack trace
Job execution failed.
org.apache.flink.runtime.client.JobExecutionException:
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
Thanks,
Naehee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Question about processing a 3-level List data type in parquet

Peter Huang
Hi Naehee,

Thanks for reporting the issue. Yes, it is a bug in the ParquetInputFormat. Would you please create a jira ticket and assign to me. I will try to fix it by the end of this weekend.
My Jira account name Zhenqiu Huang. Thanks


Best Regards
Peter Huang


On Wed, Nov 4, 2020 at 11:57 PM Naehee Kim <[hidden email]> wrote:
Hi Jingsong,

Thanks for the feedback. Can you let me know the concept and timeline of BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?

Our use case is for backfill to process parquet files in case of any data issue is found in the normal processing of kafka input. Thus, we want to make a job to easily switch kafka input and parquet file input and vice versa. Wonder if ParquetBulkFormat can fit in our use case.

Best,
Naehee

On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li <[hidden email]> wrote:
Hi Naehee, sorry for the late reply.

I think you are right, there are bugs here. We didn't think about nested structures very well before.

Now we mainly focus on the new BulkFormat implementation, which we need to consider when implementing the new ParquetBulkFormat.

Best,
Jingsong

On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <[hidden email]> wrote:
Hi Jingsong,

I am forwarding the email below to you, thinking you will have a good idea about my questions below. I'd appreciate it if you give your thoughts.

Thanks,
Naehee


---------- Forwarded message ---------
From: Naehee Kim <[hidden email]>
Date: Thu, Oct 29, 2020 at 4:38 PM
Subject: Question about processing a 3-level List data type in parquet
To: <[hidden email]>


Hi Flink Dev Community,

I've found RowConverter.java in flink-parquet module doesn't support reading a 3-level list type in parquet though it is able to process a 2-level list type. 

3-level 
optional group my_list (LIST) {
repeated group element {
required binary str (UTF8);
};
}

  2-level
optional group my_list (LIST) {
repeated int32 element;
}

The parquet file I am testing with was written by Spark job and it has a 3-level list type. When I try to process the parquet file, it runs into 'java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"' error. 

I've tested with Flink 1.9 and checked RowConverter.java still remains the same in v1.11. To process a 3-level list, I think RowConverter.java should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if my understanding is correct and if you have any plan to support a 3-level List datatype in parquet.

For your reference, here are code snippet along with stack trace.
MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema);
ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema);
DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo);
-- stack trace
Job execution failed.
org.apache.flink.runtime.client.JobExecutionException:
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
Thanks,
Naehee


--
Best, Jingsong Lee