Hi
We've got a situation where we're merging several Kafka streams and for certain streams, we want to retain up to 6 days of history. We're trying to figure out how we can migrate savepoint data between application updates when the data type for a certain state buffer updates. Let's assume that we have 2 streams with the following data types: case class A(id: String, name: String) case class B1(id: String, price: Double) We have a CoProcessFunction which combines the 2 streams and maintains 2 different buffer states: MapState[String, A] and ValueState[B1] In our scenario, we're trying to anticipate the data type of B1 changing in the future. Let's assume that in the foreseeable future, B1 will change to: case class B2(id: String, price: Double, date: String) When we create a snapshot using B1 and then upgrading the application to B2 the obvious attempt would be to try and retrieve the stored ValueState and the new ValueState: val oldState = getRuntimeContext.getState(new ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1])) val newState = getRuntimeContext.getState(new ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2])) However, as soon as you do the following error occurs: Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the previous serializer of the keyed state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions. Our assumption is that the process operator which has a specified ID which Flink uses to save and restore savepoints. The CoProcessorFunction types changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and therefore the savepoint data does not apply to the operator anymore. Is this assumption correct? We've been going through the documentation and source code of Flink and it seems like there's no achieve this kind of migrations. If this is the case, we'd be interested in contributing to Flink to get this added a.s.a.p. and would love to get some feedback on how to approach this. Thanks in advance Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Marc,
I assume you have set a UID for your CoProcessFunction as described in [1]? Also, can you provide the Flink version you are working with and the serializer you are using? If you have the UID set, your strategy seems to be the same as proposed by [2]: "Although it is not possible to change the data type of operator state, a workaround to overcome this limitation can be to define a second state with a different data type and to implement logic to migrate the state from the original state into the new state." I'm no expert on this but it looks like it should work (although I'm curious on where the "aBuffer" in the error message comes from). I'm forwarding this to Gordon in CC because he probably knows better as he was involved in state migration before (afaik). Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/ upgrading.html#application-state-compatibility [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/ upgrading.html#stateful-operators-and-user-functions On Wednesday, 20 September 2017 14:16:27 CEST mrooding wrote: > Hi > > We've got a situation where we're merging several Kafka streams and for > certain streams, we want to retain up to 6 days of history. We're trying to > figure out how we can migrate savepoint data between application updates > when the data type for a certain state buffer updates. > > Let's assume that we have 2 streams with the following data types: > > case class A(id: String, name: String) > case class B1(id: String, price: Double) > > We have a CoProcessFunction which combines the 2 streams and maintains 2 > different buffer states: > > MapState[String, A] and ValueState[B1] > > In our scenario, we're trying to anticipate the data type of B1 changing in > the future. Let's assume that in the foreseeable future, B1 will change to: > > case class B2(id: String, price: Double, date: String) > > When we create a snapshot using B1 and then upgrading the application to B2 > the obvious attempt would be to try and retrieve the stored ValueState and > the new ValueState: > > val oldState = getRuntimeContext.getState(new > ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1])) > val newState = getRuntimeContext.getState(new > ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2])) > > However, as soon as you do the following error occurs: > > Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the > previous serializer of the keyed state must be present; the serializer could > have been removed from the classpath, or its implementation have changed > and could not be loaded. This is a temporary restriction that will be fixed > in future versions. > > Our assumption is that the process operator which has a specified ID which > Flink uses to save and restore savepoints. The CoProcessorFunction types > changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and > therefore the savepoint data does not apply to the operator anymore. Is this > assumption correct? > > We've been going through the documentation and source code of Flink and it > seems like there's no achieve this kind of migrations. If this is the case, > we'd be interested in contributing to Flink to get this added a.s.a.p. and > would love to get some feedback on how to approach this. > > Thanks in advance > > Marc > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ signature.asc (201 bytes) Download Attachment |
In reply to this post by mrooding
Hi!
The exception that you have bumped into indicates that on the restore of the savepoint, the serializer for that registered state in the savepoint no longer exists. This prevents restoring savepoints taken with memory state backends because there will be no serializer available to deserialize the state at restore time. My current guess of what is happening is that the generated serializers for case class B1 has different generated names, and therefore in your modified repackaged job, it would be as if the previous serializer no longer exists in the classpath. The serializer generation for Scala case classes (the `createTypeInformation[B1]` call) depends on some Scala macros, and the resulting generated anonymous class is sensitive to quite a few factors. Could you perhaps try to verify this by checking the classname of the generated serializers (you can get the serializer by `createTypeInformation[B1].getSerializer(new Configuration())`)? If they are different for the same case class across 2 different compilations of your job (one with the B2 case class, one without), then my assumption is correct. Otherwise, we would of course need to look deeper. I'll also think about how to probably best workaround that for now meanwhile and get back to you .. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Gordon
Thanks for the detailed response. I have verified your assumption and that is, unfortunately, the case. I also looked into creating a custom Kryo serializer but I got stuck on serializing arrays of complex types. It seems like this isn't trivial at all with Kryo. As an alternative, I've been looking into using Avro only for the Flink buffers. Basically, as a first step, we'd still be sending JSON messages through Kafka but we would use a custom TypeSerializer that converts the case classes to bytes using Avro and vice versa. Unfortunately, documentation is really scarce. In a different topic, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html, it says that Avro is a bit of an odd citizen and that the AvroSerializer provided by Flink uses Kryo. This confirms what I've found by going through the source code of Flink myself. I hope that you can provide me with some pointers. Is extending TypeSerializer[T] the best way forward if we only want to use Avro for state buffers and thus utilize Avro's schema migration facilities? Any pointers would be greatly appreciated! Kind regards Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Yes, the AvroSerializer currently partially still uses Kryo for object copying. Also, right now, I think the AvroSerializer is only used when the type is recognized as a POJO, and that `isForceAvroEnabled` is set on the job configuration. I’m not sure if that is always possible. As mentioned in [1], we would probably need to improve the user experience for Avro usage.For now, if you want to directly use Avro only for serializing your state, AFAIK the straightforward approach would be, as you mentioned, to extend a custom TypeSerializer that uses the Avro constructs. Flink’s AvroSerializer actually already sorts of does this, so you can refer to that implementation as a base line. Cheers, Gordon On 5 October 2017 at 4:39:10 PM, mrooding ([hidden email]) wrote:
|
Hi Gordon
I've been looking into creating a custom AvroSerializer without Kryo which would support Avro schemas and I'm starting to wonder if this is actually the most straightforward way to do it. If I extend a class from TypeSerializer I would also need to implement a TypeInformation class to be able to provide my serializer. Implementing all these classes seems to be quite the ordeal without proper documentation. Are you sure that this is the right way forward and that there's no other option of using Avro serialization with schema support for Flink? Thanks again Marc -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Actually, Flink 1.4 will come with improved Avro support. See especially:
- https://issues.apache.org/jira/browse/FLINK-7420: Move All Avro Code to flink-avro - https://issues.apache.org/jira/browse/FLINK-7997: Avro should be always in the user code - https://issues.apache.org/jira/browse/FLINK-6022: Improve support for Avro GenericRecord This makes AvroTypeInfo and AvroSerializer quite usable.
|
Free forum by Nabble | Edit this page |