Re: Issues while writing data to a parquet sink

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: Issues while writing data to a parquet sink

Till Rohrmann
Hi Adi,

To me, this looks like a version conflict of some kind. Maybe you use different Avro versions for your user program and on your Flink cluster. Could you check that you don't have conflicting versions on your classpath? It would also be helpful to have a minimal example that allows reproducing the problem (e.g. repo with Flink job and test data).

I am forwarding this mail also to the user ML. Maybe some other community members have seen this problem before.

Cheers,
Till

On Fri, May 14, 2021 at 9:53 PM Adishesh Kishore <[hidden email]> wrote:
Hi Till,

I am using an avro schema to write data to a parquet sink. I am getting the following stack trace for some reason(Though I am not using a decimal logical type anywhere),

java.lang.IncompatibleClassChangeError: class org.apache.avro.LogicalTypes$Decimal has interface org.apache.avro.LogicalType as super class
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:180)
    at org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)
    at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)
    at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
    at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
    at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
    at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:115)
    at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:150)
    at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)
    at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:182)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:529)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:87)
    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forGenericRecord$abd75386$1(ParquetAvroWriters.java:61)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:378)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingParquetSink.invoke(StreamingParquetSink.java:555)

I am using the following schema to create the writer,

{
  "type": "error",
  "name": "root",
  "namespace": "root",
  "doc": "",
  "fields": [
    {
      "name": "account_id",
      "type": [
        "long",
        "null"
      ],
      "doc": "",
      "default": 0
    },
    {
      "name": "amount",
      "type": [
        "long",
        "null"
      ],
      "doc": "",
      "default": 0
    },
    {
      "name": "date",
      "type": [
        {
          "type": "string",
          "logicalType": "date"
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "duration",
      "type": [
        "long",
        "null"
      ],
      "doc": "",
      "default": 0
    },
    {
      "name": "loan_id",
      "type": [
        "long",
        "null"
      ],
      "doc": "",
      "default": 0
    },
    {
      "name": "payments",
      "type": [
        "double",
        "null"
      ],
      "doc": "",
      "default": 0
    },
    {
      "name": "status",
      "type": [
        "string",
        "null"
      ],
      "doc": "",
      "default": ""
    }
  ]
}


Appreciate any help in advance!

Thanks,
Adi