How to handle avro BYTES type in flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to handle avro BYTES type in flink

Catlyn Kong
Hi fellow streamers,

I'm trying to support avro BYTES type in my flink application. Since ByteBuffer isn't a supported type, I'm converting the field to an Array[Byte]:

case Type.BYTES =>
  (avroObj: AnyRef) => {
     if (avroObj == null) {
       null
     } else {
       val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
       val bytes = new Array[Byte](byteBuffer.remaining())
       byteBuffer.get(bytes)
       bytes
       }
     }


And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
I'm getting ArrayIndexOutOfBoundsException:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)

Does anyone have experience with deserializing BYTES type from avro and make it compatible with the table api? Wondering if it's cause I didn't use the correct type or maybe I need to verify if there's enough data left in the source?

Any input is appreciated.

Thanks!
Catlyn

Reply | Threaded
Open this post in threaded view
|

Re: How to handle avro BYTES type in flink

Catlyn Kong
Turns out there was some other deserialization problem unrelated to this.

On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong <[hidden email]> wrote:
Hi fellow streamers,

I'm trying to support avro BYTES type in my flink application. Since ByteBuffer isn't a supported type, I'm converting the field to an Array[Byte]:

case Type.BYTES =>
  (avroObj: AnyRef) => {
     if (avroObj == null) {
       null
     } else {
       val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
       val bytes = new Array[Byte](byteBuffer.remaining())
       byteBuffer.get(bytes)
       bytes
       }
     }


And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
I'm getting ArrayIndexOutOfBoundsException:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)

Does anyone have experience with deserializing BYTES type from avro and make it compatible with the table api? Wondering if it's cause I didn't use the correct type or maybe I need to verify if there's enough data left in the source?

Any input is appreciated.

Thanks!
Catlyn

Reply | Threaded
Open this post in threaded view
|

Re: How to handle avro BYTES type in flink

Fabian Hueske-2
Thanks for reporting back Catlyn!

Am Do., 12. Sept. 2019 um 19:40 Uhr schrieb Catlyn Kong <[hidden email]>:
Turns out there was some other deserialization problem unrelated to this.

On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong <[hidden email]> wrote:
Hi fellow streamers,

I'm trying to support avro BYTES type in my flink application. Since ByteBuffer isn't a supported type, I'm converting the field to an Array[Byte]:

case Type.BYTES =>
  (avroObj: AnyRef) => {
     if (avroObj == null) {
       null
     } else {
       val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
       val bytes = new Array[Byte](byteBuffer.remaining())
       byteBuffer.get(bytes)
       bytes
       }
     }


And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
I'm getting ArrayIndexOutOfBoundsException:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)

Does anyone have experience with deserializing BYTES type from avro and make it compatible with the table api? Wondering if it's cause I didn't use the correct type or maybe I need to verify if there's enough data left in the source?

Any input is appreciated.

Thanks!
Catlyn