I have defined a streaming file sink for parquet to store my scala case class.
StreamingFileSink .forBulkFormat( new Path(appArgs.datalakeBucket), ParquetAvroWriters .forReflectRecord(classOf[Log]) ) .withBucketAssigner(new TransactionLogHiveBucketAssigner()) .build() where my class class is Log( level: String, time_stamp: Option[Long] = None ) When Flink tries to write a specific instance to parquet Log("info",Some(1596975950000)) it throws the following error: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group time_stamp { } at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27) at org.apache.parquet.schema.GroupType.accept(GroupType.java:255) at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31) at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37) at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:280) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564) at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:87) at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73) at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:373) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Can Flink Parquet not handle field of type Option? Thanks, |
Hi Vikash, The error is coming from Parquet itself in conjunction with Avro (which is used to infer the schema of your scala class). The inferred schema is { "fields": [ { "name": "level", "type": "string" }, { "name": "time_stamp", "type": { "fields": [], "name": "Option", "namespace": "scala", "type": "record" } } ], "name": "Log", "namespace": "org.apache.flink.formats.parquet.avro", "type": "record" } As you can see, Avro infers your schema, such that Option is treated as an arbitrary class. Since it doesn't have any fields, you receive your error from Parquet though. I don't see an easy fix for it, but you can probably search for solutions with Avro's ReflectData and scala.Option. As a workaround, you can refrain from using an Option field, and go with a nullable field (you can translate it into Option with a fancy getter). In general, if you want to have more control over the schema, I'd suggest to go schema first: Define your Avro schema and use avro-hugger to generate the corresponding Scala class. In that way, Option is properly supported. Best, Arvid On Wed, Aug 12, 2020 at 2:43 AM Vikash Dat <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |