Flink complaining when trying to write to s3 in Parquet format

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink complaining when trying to write to s3 in Parquet format

Fatima Omer
I have a java app that is using a flink SQL query to perform aggregations on a data stream being read in from Kafka. Attached is the java file for reference. 

The query results are being written to s3. I can write successfully in Json format but when I try to use Parquet format, flink complains that min_ts is an optional group. I have verified that min_ts can never be null in our scheme of things.

Would appreciate help on this. Thanks!

Stack trace: 

Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group min_ts {
}
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
    at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)
    at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)
    at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    ... 50 more



StatsPipeline.java (12K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink complaining when trying to write to s3 in Parquet format

Kostas Kloudas-2
Hi Fatima,

I am not super familiar with Parquet but your issue seems to be
related to [1], which seems to be expected behaviour on the Parquet
side.
The reason for this behaviour seems to be the format of the parquet
files which store only the leaf fields but not the structure of the
groups, so if a group has no fields, its schema cannot be inferred.
Given this, I do not think that it is a bug but feel free to check
further and let us know if I am wrong.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/PARQUET-278

On Tue, Feb 11, 2020 at 11:20 PM Fatima Omer <[hidden email]> wrote:

>
> I have a java app that is using a flink SQL query to perform aggregations on a data stream being read in from Kafka. Attached is the java file for reference.
>
> The query results are being written to s3. I can write successfully in Json format but when I try to use Parquet format, flink complains that min_ts is an optional group. I have verified that min_ts can never be null in our scheme of things.
>
> Would appreciate help on this. Thanks!
>
> Stack trace:
>
> Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group min_ts {
>
> }
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
>
>     at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
>
>     at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
>
>     at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
>
>     at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
>
>     at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)
>
>     at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>
>     at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)
>
>     at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)
>
>     at com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)
>
>     at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
>
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
>     ... 50 more
>
>
>