|
Hi Naehee,
Thanks for reporting the issue. Yes, it is a bug in the ParquetInputFormat. Would you please create a jira ticket and assign to me. I will try to fix it by the end of this weekend. My Jira account name Zhenqiu Huang. Thanks
Best Regards Peter Huang
Hi Jingsong,
Thanks for the feedback. Can you let me know the concept and timeline of BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?
Our use case is for backfill to process parquet files in case of any data issue is found in the normal processing of kafka input. Thus, we want to make a job to easily switch kafka input and parquet file input and vice versa. Wonder if ParquetBulkFormat can fit in our use case.
Best, Naehee
Hi Naehee, sorry for the late reply.
I think you are right, there are bugs here. We didn't think about nested structures very well before.
Now we mainly focus on the new BulkFormat implementation, which we need to consider when implementing the new ParquetBulkFormat.
Best, Jingsong
Hi Jingsong,
I am forwarding the email below to you, thinking you will have a good idea about my questions below. I'd appreciate it if you give your thoughts.
Thanks, Naehee ---------- Forwarded message --------- From: Naehee Kim <[hidden email]>Date: Thu, Oct 29, 2020 at 4:38 PM Subject: Question about processing a 3-level List data type in parquet To: < [hidden email]> Hi Flink Dev Community,
I've found RowConverter.java in flink-parquet module doesn't support reading a 3-level list type in parquet though it is able to process a 2-level list type.
3-level optional group my_list (LIST) { repeated group element { required binary str (UTF8); }; }
2-level optional group my_list (LIST) { repeated int32 element; }
The parquet file I am testing with was written by Spark job and it has a 3-level list type. When I try to process the parquet file, it runs into 'java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"' error.
I've tested with Flink 1.9 and checked RowConverter.java still remains the same in v1.11. To process a 3-level list, I think RowConverter.java should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if my understanding is correct and if you have any plan to support a 3-level List datatype in parquet.
For your reference, here are code snippet along with stack trace. MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA); RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema); ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema); DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo); -- stack trace Job execution failed. org.apache.flink.runtime.client.JobExecutionException: at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489) at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112) at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153) at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84) Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357) Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter" at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
Thanks, Naehee
--
|