state schema evolution for case classes

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

state schema evolution for case classes

ApoorvK
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks
                               



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

r_khachatryan
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

ApoorvK
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

r_khachatryan
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

ApoorvK
Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work still

On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <[hidden email]> wrote:
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

Tzu-Li (Gordon) Tai
Hi Apoorv,

Flink currently does not natively support schema evolution for state types using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes offline as a batch job

For your question on how to implement a schema-evolution supporting serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that would be a starting point to implement something similar for your case classes.

As you will quickly realize, it's not simple, so I would strongly suggest trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work still

On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <[hidden email]> wrote:
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

ApoorvK
Thanks Gordon for the suggestion,

I am going by this repo : https://github.com/mrooding/flink-avro-state-serialization

So far I am able to alter the scala case classes and able to restore from savepoint using memory state backend, but when I am using rocksdb as statebackend and try to restore from savepoint it break with following error :

org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB.
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
	at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
	at nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
	... 8 more



On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Apoorv,

Flink currently does not natively support schema evolution for state types using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes offline as a batch job

For your question on how to implement a schema-evolution supporting serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that would be a starting point to implement something similar for your case classes.

As you will quickly realize, it's not simple, so I would strongly suggest trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work still

On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <[hidden email]> wrote:
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

Tzu-Li (Gordon) Tai
Hi Apoorv,

Sorry for the late reply, have been quite busy with backlog items the past days.

On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks Gordon for the suggestion,

I am going by this repo : https://github.com/mrooding/flink-avro-state-serialization

So far I am able to alter the scala case classes and able to restore from savepoint using memory state backend, but when I am using rocksdb as statebackend and try to restore from savepoint it break with following error :

When you say restoring it with the RocksDB backend, was the savepoint you are attempting to restore from taken with the RocksDB backend as well?
I'm asking that, because currently you cannot change the state backend across restores, as they have different savepoint binary formats.
This is also the case when you use the State Processor API - when you load an existing savepoint, you first have to load it with the same state backend that was used to create the savepoint. You can change the state backend using the State Processor API, by creating a new savepoint with your desired target backend, and dumping all state data extracted from the loaded savepoint into the new fresh savepoint.
There has been previous proposals (FLIP-41) [1] to unify the savepoint formats which would make a lot of this easier, but AFAIK this isn't on the roadmap in the near future.

Best Regards,
Gordon

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
 

org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB.
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
	at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
	at nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
	... 8 more



On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Apoorv,

Flink currently does not natively support schema evolution for state types using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes offline as a batch job

For your question on how to implement a schema-evolution supporting serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that would be a starting point to implement something similar for your case classes.

As you will quickly realize, it's not simple, so I would strongly suggest trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work still

On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <[hidden email]> wrote:
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

ApoorvK
Hi Gordon,

thanks for your response , So I have done a POC on state migration using avro, it seems it works out well.

I am using custom avro serializer (with avro schema and (TypeSerializer,TypeSerializerSnapshot) and based on that written my own custom serializer for the scala case class that I am serialising (I am using rocksdb as statedbackend). 

So when I am evolve the class with any datatype I just change avsc(avro schema json) and give old schema as well as new schema to serialise data already in rocksDB to read and accordingly write it with new and it works just fine. So I can add new class to my application supporting schema evolution, 

I have define state like this : 

private[this] lazy val stateDescriptorTest: ValueStateDescriptor[TestDataNested] =
new ValueStateDescriptor[TestDataNested]("testdata-join", TestDataNested.serializer)
private[this] lazy val stateTest: ValueState[TestDataNested] = getRuntimeContext.getState(stateDescriptorTest) Now the problem with the existing class in my current application we have define state as follow (for example):

private[this] lazy val stateDescriptorTest: ValueStateDescriptor[TestDataNested] =
new ValueStateDescriptor[TestDataNested]("testdata-join", classOf[TestDataNested])
private[this] lazy val stateTest: ValueState[TestDataNested] = getRuntimeContext.getState(stateDescriptorTest)
So when I provide TestDataNested.serializer  Instead of  "classOf[TestDataNested]" in my current application, basically replace the serialise it throws the "new state serialiser is not compaitable.

What can I do here, would be great help thanks in advance

On Fri, Mar 27, 2020 at 1:19 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Apoorv,

Sorry for the late reply, have been quite busy with backlog items the past days.

On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks Gordon for the suggestion,

I am going by this repo : https://github.com/mrooding/flink-avro-state-serialization

So far I am able to alter the scala case classes and able to restore from savepoint using memory state backend, but when I am using rocksdb as statebackend and try to restore from savepoint it break with following error :

When you say restoring it with the RocksDB backend, was the savepoint you are attempting to restore from taken with the RocksDB backend as well?
I'm asking that, because currently you cannot change the state backend across restores, as they have different savepoint binary formats.
This is also the case when you use the State Processor API - when you load an existing savepoint, you first have to load it with the same state backend that was used to create the savepoint. You can change the state backend using the State Processor API, by creating a new savepoint with your desired target backend, and dumping all state data extracted from the loaded savepoint into the new fresh savepoint.
There has been previous proposals (FLIP-41) [1] to unify the savepoint formats which would make a lot of this easier, but AFAIK this isn't on the roadmap in the near future.

Best Regards,
Gordon

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
 

org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB.
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
	at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
	at nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
	... 8 more



On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Apoorv,

Flink currently does not natively support schema evolution for state types using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes offline as a batch job

For your question on how to implement a schema-evolution supporting serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that would be a starting point to implement something similar for your case classes.

As you will quickly realize, it's not simple, so I would strongly suggest trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work still

On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <[hidden email]> wrote:
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state schema evolution for case classes

Tzu-Li (Gordon) Tai
Hi,

I see the problem that you are bumping into as the following:
- In your previous job, you seem to be falling back to Kryo for the state serialization.
- In your new job, you are trying to change that to use a custom serializer.

You can confirm this by looking at the stack trace of the "new state serializer is not compatible" exception.
Could you maybe post me the stack trace?

If that is indeed the case, I'm afraid that right now, you'll only be able to resolve this using the State Processor API to migrate this serializer offline.
The reason for this is as follows:
- In order to perform the serializer migration at restore time, Flink needs to understand that the old serializer (i.e. the Kryo serializer in your case) is compatible with the new serializer (the custom TestDataNested serializer in your case).
- Currently, Flink cannot reason about serializer compatibility when the serializer changes completely to a different class than before. Therefore, if the serializer class changes, to be safe, right now Flink always assume that the new serializer is not compatible and therefore fails the restore.

You can manually force this migration offline, as I said using the State Processor API:
- The steps would be to load the previous savepoint, and when reading your `testdata-join` state values, use the previous way of providing the serializer (i.e. classOf[TestDataNested]).
- Then, bootstrap a new savepoint with the state values read from `testdata-join`. You may use whatever new serializer you want to write the state into the new savepoint.

As a side note, I have been thinking about two options that allows a easier path for users to do this:
Option #1: The Kryo serializer should assume that new serializers are always compatible, given that the target serialized classes are the same (which is true for your case). This allows users to opt-out of Kryo serialization, which has always just been a fallback that many users did not realize they were using when Flink cannot interpret the state type.
Option #2: Maybe add a "force-migration" option when restoring from savepoints. This would essentially be an online version of the State Processor API process I explained above, but instead of happening offline, the migration would happen at restore from savepoints.

TL;DR: for now, I would suggest to try using the State Processor API to migrate the serializer for your specific case.

Cheers,
Gordon

On Thu, Apr 2, 2020 at 11:14 PM Apoorv Upadhyay <[hidden email]> wrote:
Hi Gordon,

thanks for your response , So I have done a POC on state migration using avro, it seems it works out well.

I am using custom avro serializer (with avro schema and (TypeSerializer,TypeSerializerSnapshot) and based on that written my own custom serializer for the scala case class that I am serialising (I am using rocksdb as statedbackend). 

So when I am evolve the class with any datatype I just change avsc(avro schema json) and give old schema as well as new schema to serialise data already in rocksDB to read and accordingly write it with new and it works just fine. So I can add new class to my application supporting schema evolution, 

I have define state like this : 

private[this] lazy val stateDescriptorTest: ValueStateDescriptor[TestDataNested] =
new ValueStateDescriptor[TestDataNested]("testdata-join", TestDataNested.serializer)
private[this] lazy val stateTest: ValueState[TestDataNested] = getRuntimeContext.getState(stateDescriptorTest) Now the problem with the existing class in my current application we have define state as follow (for example):

private[this] lazy val stateDescriptorTest: ValueStateDescriptor[TestDataNested] =
new ValueStateDescriptor[TestDataNested]("testdata-join", classOf[TestDataNested])
private[this] lazy val stateTest: ValueState[TestDataNested] = getRuntimeContext.getState(stateDescriptorTest)
So when I provide TestDataNested.serializer  Instead of  "classOf[TestDataNested]" in my current application, basically replace the serialise it throws the "new state serialiser is not compaitable.

What can I do here, would be great help thanks in advance

On Fri, Mar 27, 2020 at 1:19 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Apoorv,

Sorry for the late reply, have been quite busy with backlog items the past days.

On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks Gordon for the suggestion,

I am going by this repo : https://github.com/mrooding/flink-avro-state-serialization

So far I am able to alter the scala case classes and able to restore from savepoint using memory state backend, but when I am using rocksdb as statebackend and try to restore from savepoint it break with following error :

When you say restoring it with the RocksDB backend, was the savepoint you are attempting to restore from taken with the RocksDB backend as well?
I'm asking that, because currently you cannot change the state backend across restores, as they have different savepoint binary formats.
This is also the case when you use the State Processor API - when you load an existing savepoint, you first have to load it with the same state backend that was used to create the savepoint. You can change the state backend using the State Processor API, by creating a new savepoint with your desired target backend, and dumping all state data extracted from the loaded savepoint into the new fresh savepoint.
There has been previous proposals (FLIP-41) [1] to unify the savepoint formats which would make a lot of this easier, but AFAIK this isn't on the roadmap in the near future.

Best Regards,
Gordon

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
 

org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB.
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
	at nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
	at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
	at nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
	... 8 more



On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Apoorv,

Flink currently does not natively support schema evolution for state types using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes offline as a batch job

For your question on how to implement a schema-evolution supporting serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that would be a starting point to implement something similar for your case classes.

As you will quickly realize, it's not simple, so I would strongly suggest trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <[hidden email]> wrote:
Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work still

On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <[hidden email]> wrote:
Hi Apoorv,

You can achieve this by implementing custom serializers for your state.

Regards,
Roman


On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <[hidden email]> wrote:
Hi Roman,

I have successfully migrated to flink 1.8.2 with the savepoint created by flink 1.6.2.
Now I have to modify few case classes due to new requirement I have created a savepoint and when I run the app with modified class from the savepoint it throws error "state not compatible"
Previously there were no serializer used.
I now wish to support state schema Hence need suggestion how can i achieve that ?

Regards

On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <[hidden email]> wrote:
Hi ApoorvK,

I understand that you have a savepoint created by Flink 1.6.2 and you want to use it with Flink 1.8.2. The classes themselves weren't modified. 
Is that correct?
Which serializer did you use?

Regards,
Roman


On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <[hidden email]> wrote:
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
                                 var b: Boolean,
                                 var c: Boolean,
                                 var d: NestedCaseClass,
                                 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ?
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/