I have a java app that is using a flink SQL query to perform aggregations on a data stream being read in from Kafka. Attached is the java file for reference. The query results are being written to s3. I can write successfully in Json format but when I try to use Parquet format, flink complains that min_ts is an optional group. I have verified that min_ts can never be null in our scheme of things. Would appreciate help on this. Thanks! Stack trace: Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group min_ts { } at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27) at org.apache.parquet.schema.GroupType.accept(GroupType.java:255) at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31) at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37) at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530) at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37) at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48) at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ... 50 more StatsPipeline.java (12K) Download Attachment |
Hi Fatima,
I am not super familiar with Parquet but your issue seems to be related to [1], which seems to be expected behaviour on the Parquet side. The reason for this behaviour seems to be the format of the parquet files which store only the leaf fields but not the structure of the groups, so if a group has no fields, its schema cannot be inferred. Given this, I do not think that it is a bug but feel free to check further and let us know if I am wrong. Cheers, Kostas [1] https://issues.apache.org/jira/browse/PARQUET-278 On Tue, Feb 11, 2020 at 11:20 PM Fatima Omer <[hidden email]> wrote: > > I have a java app that is using a flink SQL query to perform aggregations on a data stream being read in from Kafka. Attached is the java file for reference. > > The query results are being written to s3. I can write successfully in Json format but when I try to use Parquet format, flink complains that min_ts is an optional group. I have verified that min_ts can never be null in our scheme of things. > > Would appreciate help on this. Thanks! > > Stack trace: > > Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group min_ts { > > } > > at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27) > > at org.apache.parquet.schema.GroupType.accept(GroupType.java:255) > > at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31) > > at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37) > > at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) > > at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23) > > at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233) > > at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) > > at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530) > > at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37) > > at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48) > > at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) > > at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103) > > at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222) > > at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212) > > at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268) > > at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370) > > at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > > ... 50 more > > > |
Free forum by Nabble | Edit this page |