KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

Averell
Hi,

I'm trying to convert a stream of JSON string to a stream of Avro
GenericRecords, and write this to parquet files, but I get the exception.
This exception came at the line /out.collect(genericRecord)/. If there's no
sink then there's no error.
/KryoException: java.lang.UnsupportedOperationException/

My code is as following:
/        val parquetSink: StreamingFileSink[GenericRecord] =
StreamingFileSink
              .forBulkFormat(new Path(path),
                  ParquetAvroWriters.forGenericRecord(new
Schema.Parser().parse(schemaString)))
              .build()


        val parquetStream = inputStream.process(new ProcessFunction[String,
GenericRecord] {
            @transient
            private var schema: Schema = _
            @transient
            private var reader: GenericDatumReader[GenericRecord] = _

            override def processElement(value: String,
                                        ctx: ProcessFunction[String,
GenericRecord]#Context,
                                        out: Collector[GenericRecord]): Unit
= {
                if (reader == null) {
                    schema = new Schema.Parser().parse(schemaString)
                    reader = new GenericDatumReader[GenericRecord](schema)
                }
                try {
                    val genericRecord = reader.read(null,
DecoderFactory.get.jsonDecoder(schema, value))
                    out.collect(genericRecord)
                } catch {
                    case e: Throwable =>
                        LOG.warn(s"Error decoding JSON string: $e\nRaw
string: `${value.value}`")
                    throw e
                }
            }
        })
        parquetStream.addSink(parquetSink)
/

The schema is a simple one with all fields are string.
I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
Could you please help?

Thanks and regards,
Averell


============
/com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
        at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53)
        at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44)
        at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: null
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 33 common frames omitted/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

Till Rohrmann
Hi Averell,

it looks as if the org.apache.avro.Schema$Field contains a field which is an unmodifiable collection. The Kryo serializer will try to deserialize this field by creating an unmodifiable collection and then trying to add the elements into it. This will fail.

I would recommend using the AvroSerializer for serializing GenericRecords. You have to add org.apache.flink:flink-avro as a dependency to your job and then tell the system that you would like to use the GenericRecordAvroTypeInfo via 

DataStream<GenericRecord> sourceStream =
    env.addSource(new AvroGenericSource())
        .returns(new GenericRecordAvroTypeInfo(schema));

You can find more information about it here [1].


Cheers,
Till

On Wed, Oct 21, 2020 at 1:48 PM Averell <[hidden email]> wrote:
Hi,

I'm trying to convert a stream of JSON string to a stream of Avro
GenericRecords, and write this to parquet files, but I get the exception.
This exception came at the line /out.collect(genericRecord)/. If there's no
sink then there's no error.
/KryoException: java.lang.UnsupportedOperationException/

My code is as following:
/        val parquetSink: StreamingFileSink[GenericRecord] =
StreamingFileSink
              .forBulkFormat(new Path(path),
                  ParquetAvroWriters.forGenericRecord(new
Schema.Parser().parse(schemaString)))
              .build()


        val parquetStream = inputStream.process(new ProcessFunction[String,
GenericRecord] {
            @transient
            private var schema: Schema = _
            @transient
            private var reader: GenericDatumReader[GenericRecord] = _

            override def processElement(value: String,
                                        ctx: ProcessFunction[String,
GenericRecord]#Context,
                                        out: Collector[GenericRecord]): Unit
= {
                if (reader == null) {
                    schema = new Schema.Parser().parse(schemaString)
                    reader = new GenericDatumReader[GenericRecord](schema)
                }
                try {
                    val genericRecord = reader.read(null,
DecoderFactory.get.jsonDecoder(schema, value))
                    out.collect(genericRecord)
                } catch {
                    case e: Throwable =>
                        LOG.warn(s"Error decoding JSON string: $e\nRaw
string: `${value.value}`")
                    throw e
                }
            }
        })
        parquetStream.addSink(parquetSink)
/

The schema is a simple one with all fields are string.
I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
Could you please help?

Thanks and regards,
Averell


============
/com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
        at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53)
        at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44)
        at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: null
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 33 common frames omitted/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

Averell
Hello Till,

Adding GenericRecordAvroTypeInfo(schema) does help.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/