State serialization problem

classic Classic list List threaded Threaded
5 messages Options
kla
Reply | Threaded
Open this post in threaded view
|

State serialization problem

kla
This post was updated on .
Hi guys,

I have the flink streaming job running (1.2.0 version) which has the
following state:

private transient ValueState<Map&lt;String, Set&lt;User>>> userState;

With following configuration:

final ValueStateDescriptor<Map&lt;String, Set&lt;User>>> descriptor =
                new ValueStateDescriptor<>("userState",
TypeInformation.of(new UserTypeHint()));
        userState = getRuntimeContext().getState(descriptor);
And the User class has following:

public class User {

    private String id;

    private String firstName;

    private String lastName;

}

And now we are trying to add one more field in the user object. (for
example emailAddress). But apparently it didn't work, I am getting following
exception:

018-03-13 13:26:13,357 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job CountJob
(cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING.
com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
Serialization trace:
type (com.example.User)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82)
        at
com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33)
        at
com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27)
        at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
        at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.get(ArrayList.java:429)
        at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
        ... 15 more


Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: State serialization problem when we add a new field in the object

Aljoscha Krettek
Hi,

I'm afraid Flink does currently not support changing the schema of state when restoring from a savepoint.

Best,
Aljoscha

> On 13. Mar 2018, at 07:36, kla <[hidden email]> wrote:
>
> Hi guys,
>
> I have the flink streaming job running (1.2.0 version) which has the
> following state:
>
> private transient ValueState<Map&lt;String, Set&lt;User>>> userState;
>
> With following configuration:
>
> final ValueStateDescriptor<Map&lt;String, Set&lt;User>>> descriptor =
>                new ValueStateDescriptor<>("userState",
> TypeInformation.of(new UserTypeHint()));
>        userState = getRuntimeContext().getState(descriptor);
> And the User class has following:
>
> public class User {
>
>    private String id;
>
>    private String firstName;
>
>    private String lastName;
>
> }
>
> And after some time we tried to add one more field in the user object. (for
> example emailAddress). But apparently I didn't work, I am getting following
> exception:
>
> 018-03-13 13:26:13,357 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job CountJob
> (cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING.
> com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
> Serialization trace:
> type (com.example.User)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82)
> at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33)
> at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> ... 15 more
>
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

kla
Reply | Threaded
Open this post in threaded view
|

Re: State serialization problem when we add a new field in the object

kla
Hi Aljoscha,

Thanks for your reply.

Do you have a suggestion how can we workaround it ?

We have a production system running with Flink and it is mandatory to add
one more field in the state.

Maybe some how we can write our own serializer?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: State serialization problem when we add a new field in the object

Fabian Hueske-2
Hi,

Flink supports upgrading of serializers [1] [2] since version 1.3.
You probably need to upgrade to Flink 1.3 before you can use the feature.

Best, Fabian

2018-03-14 10:03 GMT+01:00 kla <[hidden email]>:
Hi Aljoscha,

Thanks for your reply.

Do you have a suggestion how can we workaround it ?

We have a production system running with Flink and it is mandatory to add
one more field in the state.

Maybe some how we can write our own serializer?

Reply | Threaded
Open this post in threaded view
|

Re: State serialization problem when we add a new field in the object

Kostas Kloudas
In reply to this post by kla
Hi Konstantin,

What you could do, is that you write and intermediate job that has the old ValueState “oldState”
and the new one “newState”, with the new format.

When an element comes in this intermediate job, you check the oldState if it is empty for that key or not.
If it is null (empty), you simply process the element as it is the first time you see the key.
If it is not empty, then you implement your migration logic that ports the oldState to the newState format,
you store the migrated state in the newState, and delete it from the oldState.
Of course after the migration you process the element as usual, but only use the new state.

If at some point you are sure that you have seen all the keys from the previous version of the code,
then at that point you can be sure that all the old-format states have been migrated and you can take
a savepoint, clean up the job from the migration logic, and resume from the savepoint with
the new code.

If there is no such point where you can be sure that you have migrated the state for all keys, then you
just your job run like this, i.e. with the migration logic.

The problem with the above strategy is that in the case that you do not have a point where you can be sure
that you have seen all keys, if you want to migrate once again in the future, you will have to implement
the same thing but migrating from two different previous versions. But at that point you may have a policy
that says that if I have not seen a key for the last week or month, then I do not consider active and i do not
care about it.

I hope this helps!

Cheers,
Kostas

> On Mar 14, 2018, at 10:03 AM, kla <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Thanks for your reply.
>
> Do you have a suggestion how can we workaround it ?
>
> We have a production system running with Flink and it is mandatory to add
> one more field in the state.
>
> Maybe some how we can write our own serializer?
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/