Thanks Gordon for the suggestion,
I am going by this repo : https://github.com/mrooding/flink-avro-state-serialization
So far I am able to alter the scala case classes and able to restore from savepoint using memory state backend, but when I am using rocksdb as statebackend and try to restore from savepoint it break with following error :
org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB. at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92) at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14) at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473) at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42) at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9) at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90) ... 8 moreOn Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:Hi Apoorv,
Flink currently does not natively support schema evolution for state types using Scala case classes [1].
So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes offline as a batch job
For your question on how to implement a schema-evolution supporting serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that would be a starting point to implement something similar for your case classes.
As you will quickly realize, it's not simple, so I would strongly suggest trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.
Cheers,
Gordon
[1] https://issues.apache.org/jira/browse/FLINK-10896
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.htmlOn Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <[hidden email]> wrote:Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work stillOn Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <[hidden email]> wrote:Hi Apoorv,You can achieve this by implementing custom serializers for your state.Please refer to https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.htmlRegards,
RomanOn Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:Hi Roman,
I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?
RegardsOn Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:Hi ApoorvK,I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified.Is that correct?Which serializer did you use?Regards,
RomanOn Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:Hi Team,
Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :
case class MyCaseClass(var a: Boolean,
var b: Boolean,
var c: Boolean,
var d: NestedCaseClass,
var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}
Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.
1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?
Please suggest what can be done here, or redirect for the avros
serialisation example.
Thanks
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Free forum by Nabble | Edit this page |