Hi,
I'm trying to convert a stream of JSON string to a stream of Avro GenericRecords, and write this to parquet files, but I get the exception. This exception came at the line /out.collect(genericRecord)/. If there's no sink then there's no error. /KryoException: java.lang.UnsupportedOperationException/ My code is as following: / val parquetSink: StreamingFileSink[GenericRecord] = StreamingFileSink .forBulkFormat(new Path(path), ParquetAvroWriters.forGenericRecord(new Schema.Parser().parse(schemaString))) .build() val parquetStream = inputStream.process(new ProcessFunction[String, GenericRecord] { @transient private var schema: Schema = _ @transient private var reader: GenericDatumReader[GenericRecord] = _ override def processElement(value: String, ctx: ProcessFunction[String, GenericRecord]#Context, out: Collector[GenericRecord]): Unit = { if (reader == null) { schema = new Schema.Parser().parse(schemaString) reader = new GenericDatumReader[GenericRecord](schema) } try { val genericRecord = reader.read(null, DecoderFactory.get.jsonDecoder(schema, value)) out.collect(genericRecord) } catch { case e: Throwable => LOG.warn(s"Error decoding JSON string: $e\nRaw string: `${value.value}`") throw e } } }) parquetStream.addSink(parquetSink) / The schema is a simple one with all fields are string. I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this. Could you please help? Thanks and regards, Averell ============ /com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53) at mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 33 common frames omitted/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, it looks as if the org.apache.avro.Schema$Field contains a field which is an unmodifiable collection. The Kryo serializer will try to deserialize this field by creating an unmodifiable collection and then trying to add the elements into it. This will fail. I would recommend using the AvroSerializer for serializing GenericRecords. You have to add org.apache.flink:flink-avro as a dependency to your job and then tell the system that you would like to use the GenericRecordAvroTypeInfo via DataStream<GenericRecord> sourceStream = env.addSource(new AvroGenericSource()) .returns(new GenericRecordAvroTypeInfo(schema)); You can find more information about it here [1]. Cheers, Till On Wed, Oct 21, 2020 at 1:48 PM Averell <[hidden email]> wrote: Hi, |
Hello Till,
Adding GenericRecordAvroTypeInfo(schema) does help. Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |