This post was updated on .
Hi guys,
I have the flink streaming job running (1.2.0 version) which has the following state: private transient ValueState<Map<String, Set<User>>> userState; With following configuration: final ValueStateDescriptor<Map<String, Set<User>>> descriptor = new ValueStateDescriptor<>("userState", TypeInformation.of(new UserTypeHint())); userState = getRuntimeContext().getState(descriptor); And the User class has following: public class User { private String id; private String firstName; private String lastName; } And now we are trying to add one more field in the user object. (for example emailAddress). But apparently it didn't work, I am getting following exception: 018-03-13 13:26:13,357 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CountJob (cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING. com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4 Serialization trace: type (com.example.User) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82) at com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33) at com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27) at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ... 15 more Thanks, Konstantin -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I'm afraid Flink does currently not support changing the schema of state when restoring from a savepoint. Best, Aljoscha > On 13. Mar 2018, at 07:36, kla <[hidden email]> wrote: > > Hi guys, > > I have the flink streaming job running (1.2.0 version) which has the > following state: > > private transient ValueState<Map<String, Set<User>>> userState; > > With following configuration: > > final ValueStateDescriptor<Map<String, Set<User>>> descriptor = > new ValueStateDescriptor<>("userState", > TypeInformation.of(new UserTypeHint())); > userState = getRuntimeContext().getState(descriptor); > And the User class has following: > > public class User { > > private String id; > > private String firstName; > > private String lastName; > > } > > And after some time we tried to add one more field in the user object. (for > example emailAddress). But apparently I didn't work, I am getting following > exception: > > 018-03-13 13:26:13,357 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CountJob > (cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING. > com.esotericsoftware.kryo.KryoException: > java.lang.IndexOutOfBoundsException: Index: 58, Size: 4 > Serialization trace: > type (com.example.User) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82) > at > com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33) > at > com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) > ... 15 more > > > Thanks, > Konstantin > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Aljoscha,
Thanks for your reply. Do you have a suggestion how can we workaround it ? We have a production system running with Flink and it is mandatory to add one more field in the state. Maybe some how we can write our own serializer? Thanks, Konstantin -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Flink supports upgrading of serializers [1] [2] since version 1.3.Best, Fabian 2018-03-14 10:03 GMT+01:00 kla <[hidden email]>: Hi Aljoscha, |
In reply to this post by kla
Hi Konstantin,
What you could do, is that you write and intermediate job that has the old ValueState “oldState” and the new one “newState”, with the new format. When an element comes in this intermediate job, you check the oldState if it is empty for that key or not. If it is null (empty), you simply process the element as it is the first time you see the key. If it is not empty, then you implement your migration logic that ports the oldState to the newState format, you store the migrated state in the newState, and delete it from the oldState. Of course after the migration you process the element as usual, but only use the new state. If at some point you are sure that you have seen all the keys from the previous version of the code, then at that point you can be sure that all the old-format states have been migrated and you can take a savepoint, clean up the job from the migration logic, and resume from the savepoint with the new code. If there is no such point where you can be sure that you have migrated the state for all keys, then you just your job run like this, i.e. with the migration logic. The problem with the above strategy is that in the case that you do not have a point where you can be sure that you have seen all keys, if you want to migrate once again in the future, you will have to implement the same thing but migrating from two different previous versions. But at that point you may have a policy that says that if I have not seen a key for the last week or month, then I do not consider active and i do not care about it. I hope this helps! Cheers, Kostas > On Mar 14, 2018, at 10:03 AM, kla <[hidden email]> wrote: > > Hi Aljoscha, > > Thanks for your reply. > > Do you have a suggestion how can we workaround it ? > > We have a production system running with Flink and it is mandatory to add > one more field in the state. > > Maybe some how we can write our own serializer? > > Thanks, > Konstantin > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |