Expception with Avro Serialization on RocksDBStateBackend

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Expception with Avro Serialization on RocksDBStateBackend

Biplob Biswas
Hi,

I am getting the following exception in my code, I can observe that there's something wrong while serializing my Object, the class of which looks something like this:
 
https://gist.github.com/revolutionisme/1eea5ccf5e1d4a5452f27a1fd5c05ff1

The exact cause it seems is some field inside my nested object which is null (reversalIndicator ), but its not exactly clear why this exception is thrown, one interesting thing to note is when I serialized with kryo before, it serialized properly without any issues. Is it some requirement of the avro serializer or some bug ? or Some problem on my end?



2017-08-22 17:21:48,892 ERROR com.airplus.poc.flink.statefulFunctions.UpdateTxnState        - Something unexpected happened - probably malformed event
java.lang.RuntimeException: Error while adding data to RocksDB
        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
        at com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:98)
        at com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:1)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException: in com.airplus.poc.flink.model.TransactionStateModel in com.airplus.poc.generated.xjc.RecordReadEventType in com.airplus.poc.generated.xjc.RawTransactionItemType in string null of string in field reversalIndicator of com.airplus.poc.generated.xjc.RawTransactionItemType in field rawTransactionItem of com.airplus.poc.generated.xjc.RecordReadEventType in field recordReadEvent of com.airplus.poc.flink.model.TransactionStateModel
        at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(AvroSerializer.java:135)
        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
        ... 8 more
Caused by: java.lang.NullPointerException



Thanks & Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Expception with Avro Serialization on RocksDBStateBackend

Till Rohrmann
Hi Biplob,

have you told Avro to allow null for fields in your schema? If yes, then could you share the Avro schema, the version of Flink as well as the Avro version with us? This would help with further understanding the problem.

Cheers,
Till

On Tue, Aug 22, 2017 at 5:42 PM, Biplob Biswas <[hidden email]> wrote:
Hi,

I am getting the following exception in my code, I can observe that there's
something wrong while serializing my Object, the class of which looks
something like this:

https://gist.github.com/revolutionisme/1eea5ccf5e1d4a5452f27a1fd5c05ff1

The exact cause it seems is some field inside my nested object which is null
(reversalIndicator ), but its not exactly clear why this exception is
thrown, one interesting thing to note is when I serialized with kryo before,
it serialized properly without any issues. Is it some requirement of the
avro serializer or some bug ? or Some problem on my end?



2017-08-22 17:21:48,892 ERROR
com.airplus.poc.flink.statefulFunctions.UpdateTxnState        - Something
unexpected happened - probably malformed event
java.lang.RuntimeException: Error while adding data to RocksDB
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
        at
com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:98)
        at
com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:1)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException: in
com.airplus.poc.flink.model.TransactionStateModel in
com.airplus.poc.generated.xjc.RecordReadEventType in
com.airplus.poc.generated.xjc.RawTransactionItemType in string null of
string in field reversalIndicator of
com.airplus.poc.generated.xjc.RawTransactionItemType in field
rawTransactionItem of com.airplus.poc.generated.xjc.RecordReadEventType in
field recordReadEvent of com.airplus.poc.flink.model.TransactionStateModel
        at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
        at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at
org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(AvroSerializer.java:135)
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
        ... 8 more
Caused by: java.lang.NullPointerException



Thanks & Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expception-with-Avro-Serialization-on-RocksDBStateBackend-tp15067.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Expception with Avro Serialization on RocksDBStateBackend

Biplob Biswas
Hi Till,

Thanks for the response.  I was assuming that the Avro Serializer will create a corresponding Avro schema with the Object class I provide. In that respect, I did the following:

AvroSerializer<TransactionStateModel> txnAvroSerde = new AvroSerializer<>(TransactionStateModel.class);
    ValueStateDescriptor<TransactionStateModel> stateDescriptor = new ValueStateDescriptor<>(
            "transaction", txnAvroSerde);

    stateDescriptor.setQueryable("transaction");

    this.txnState = getRuntimeContext().getState(stateDescriptor);

By doing this, I was expecting that the avro serializer would convert my data into avro format using the corresponding avro schema created with the information from my class. And correspondingly this data in the avro format to be stored in the RcoksDB statebackend.

Is my assumption wrong? I am using Flink 1.3.2 along with the flink-avro_2.10 library for the same flink version which internally has avro 1.7.7

Thanks,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Expception with Avro Serialization on RocksDBStateBackend

Biplob Biswas
Hi,

I am still stuck here, and I still couldn't find a way to make Avro accept
null values.

Any help here would be really appreciated.

Thanks,
Biplob



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/