Flink Parquet Streaming FileSink with scala case class with optional fields error

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Parquet Streaming FileSink with scala case class with optional fields error

Vikash Dat
I have defined a streaming file sink for parquet to store my scala case class. 

StreamingFileSink

  .forBulkFormat(

    new Path(appArgs.datalakeBucket),

    ParquetAvroWriters

      .forReflectRecord(classOf[Log])

  )

  .withBucketAssigner(new TransactionLogHiveBucketAssigner())

  .build()


where my class class is 

Log(

   level: String,

    time_stamp: Option[Long] = None

)


When Flink tries to write a specific instance to parquet


Log("info",Some(1596975950000))


it throws the following error:


org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group time_stamp {
}
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
    at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:280)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:87)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
    at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:373)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)

Can Flink Parquet not handle field of type Option?

Thanks,




Reply | Threaded
Open this post in threaded view
|

Re: Flink Parquet Streaming FileSink with scala case class with optional fields error

Arvid Heise-3
Hi Vikash,

The error is coming from Parquet itself in conjunction with Avro (which is used to infer the schema of your scala class). The inferred schema is

{
    "fields": [
        {
            "name": "level",
            "type": "string"
        },
        {
            "name": "time_stamp",
            "type": {
                "fields": [],
                "name": "Option",
                "namespace": "scala",
                "type": "record"
            }
        }
    ],
    "name": "Log",
    "namespace": "org.apache.flink.formats.parquet.avro",
    "type": "record"
}

As you can see, Avro infers your schema, such that Option is treated as an arbitrary class. Since it doesn't have any fields, you receive your error from Parquet though.

I don't see an easy fix for it, but you can probably search for solutions with Avro's ReflectData and scala.Option. As a workaround, you can refrain from using an Option field, and go with a nullable field (you can translate it into Option with a fancy getter).

In general, if you want to have more control over the schema, I'd suggest to go schema first: Define your Avro schema and use avro-hugger to generate the corresponding Scala class. In that way, Option is properly supported.

Best,

Arvid



On Wed, Aug 12, 2020 at 2:43 AM Vikash Dat <[hidden email]> wrote:
I have defined a streaming file sink for parquet to store my scala case class. 

StreamingFileSink

  .forBulkFormat(

    new Path(appArgs.datalakeBucket),

    ParquetAvroWriters

      .forReflectRecord(classOf[Log])

  )

  .withBucketAssigner(new TransactionLogHiveBucketAssigner())

  .build()


where my class class is 

Log(

   level: String,

    time_stamp: Option[Long] = None

)


When Flink tries to write a specific instance to parquet


Log("info",Some(1596975950000))


it throws the following error:


org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group time_stamp {
}
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
    at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:280)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:87)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
    at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:373)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)

Can Flink Parquet not handle field of type Option?

Thanks,






--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng