Custom Class for state checkpointing

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Class for state checkpointing

Rico Bergmann
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)
Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

rmetzger0
Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)

Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

Márton Balassi
In reply to this post by Rico Bergmann
Hey Rico,

Currently the Checkpointed interface has the limitation the return type of the snapshotstate method (the generic paramter of Checkpointed) has to be java Serializable. I suspect that is the problem here. This is a limitation that we plan to lift soon.

Marton

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)

Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

Rico Bergmann
In reply to this post by rmetzger0
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)

Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

Rico Bergmann
In reply to this post by Márton Balassi
Hi Marton. 

I think this is more a class loader issue. My custom class implements Serializable and so do all contained members classes. 

Greets. Rico. 



Am 18.08.2015 um 11:45 schrieb Márton Balassi <[hidden email]>:

Hey Rico,

Currently the Checkpointed interface has the limitation the return type of the snapshotstate method (the generic paramter of Checkpointed) has to be java Serializable. I suspect that is the problem here. This is a limitation that we plan to lift soon.

Marton

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)

Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

rmetzger0
In reply to this post by Rico Bergmann
Java's HashMap is serializable.
If it is only the map, you can just use the HashMap<> as the state.

If you have more data, you can use TupleX, for example:

Tuple2<HashMap<Integer, String>, Long>(myMap, myLong);


On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)


Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

Stephan Ewen
Yep, that is a valid bug!
State is apparently not resolved with the correct classloader.

As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for that.

You can get the ClassLoader for the user code (needed for deserialization) via "getRuntimeContext().getUserCodeClassLoader()".

Let us know if that workaround works. We'll try to get a fix for that out very soon!

Greetings,
Stephan



On Tue, Aug 18, 2015 at 12:23 PM, Robert Metzger <[hidden email]> wrote:
Java's HashMap is serializable.
If it is only the map, you can just use the HashMap<> as the state.

If you have more data, you can use TupleX, for example:

Tuple2<HashMap<Integer, String>, Long>(myMap, myLong);


On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)



Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

rmetzger0
I created a JIRA for the issue: https://issues.apache.org/jira/browse/FLINK-2543

Once I'm done with the Kafka pull request, I'll take a look into this.

On Tue, Aug 18, 2015 at 1:56 PM, Stephan Ewen <[hidden email]> wrote:
Yep, that is a valid bug!
State is apparently not resolved with the correct classloader.

As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for that.

You can get the ClassLoader for the user code (needed for deserialization) via "getRuntimeContext().getUserCodeClassLoader()".

Let us know if that workaround works. We'll try to get a fix for that out very soon!

Greetings,
Stephan



On Tue, Aug 18, 2015 at 12:23 PM, Robert Metzger <[hidden email]> wrote:
Java's HashMap is serializable.
If it is only the map, you can just use the HashMap<> as the state.

If you have more data, you can use TupleX, for example:

Tuple2<HashMap<Integer, String>, Long>(myMap, myLong);


On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)




Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

rmetzger0
I'm still working on writing a test case for reproducing the issue.
Which Flink version are you using?
If you are using 0.10-SNAPSHOT, which exact commit?

On Tue, Aug 18, 2015 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
I created a JIRA for the issue: https://issues.apache.org/jira/browse/FLINK-2543

Once I'm done with the Kafka pull request, I'll take a look into this.

On Tue, Aug 18, 2015 at 1:56 PM, Stephan Ewen <[hidden email]> wrote:
Yep, that is a valid bug!
State is apparently not resolved with the correct classloader.

As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for that.

You can get the ClassLoader for the user code (needed for deserialization) via "getRuntimeContext().getUserCodeClassLoader()".

Let us know if that workaround works. We'll try to get a fix for that out very soon!

Greetings,
Stephan



On Tue, Aug 18, 2015 at 12:23 PM, Robert Metzger <[hidden email]> wrote:
Java's HashMap is serializable.
If it is only the map, you can just use the HashMap<> as the state.

If you have more data, you can use TupleX, for example:

Tuple2<HashMap<Integer, String>, Long>(myMap, myLong);


On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)





Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

Rico Bergmann
The head Revision of 0.10-SNAPSHOT.



Am 18.08.2015 um 17:10 schrieb Robert Metzger <[hidden email]>:

I'm still working on writing a test case for reproducing the issue.
Which Flink version are you using?
If you are using 0.10-SNAPSHOT, which exact commit?

On Tue, Aug 18, 2015 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
I created a JIRA for the issue: https://issues.apache.org/jira/browse/FLINK-2543

Once I'm done with the Kafka pull request, I'll take a look into this.

On Tue, Aug 18, 2015 at 1:56 PM, Stephan Ewen <[hidden email]> wrote:
Yep, that is a valid bug!
State is apparently not resolved with the correct classloader.

As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for that.

You can get the ClassLoader for the user code (needed for deserialization) via "getRuntimeContext().getUserCodeClassLoader()".

Let us know if that workaround works. We'll try to get a fix for that out very soon!

Greetings,
Stephan



On Tue, Aug 18, 2015 at 12:23 PM, Robert Metzger <[hidden email]> wrote:
Java's HashMap is serializable.
If it is only the map, you can just use the HashMap<> as the state.

If you have more data, you can use TupleX, for example:

Tuple2<HashMap<Integer, String>, Long>(myMap, myLong);


On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)





Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

Rico Bergmann
In reply to this post by Stephan Ewen
Hi. 

Thanks for the tip. It seems to work...

Greets. 



Am 18.08.2015 um 13:56 schrieb Stephan Ewen <[hidden email]>:

Yep, that is a valid bug!
State is apparently not resolved with the correct classloader.

As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for that.

You can get the ClassLoader for the user code (needed for deserialization) via "getRuntimeContext().getUserCodeClassLoader()".

Let us know if that workaround works. We'll try to get a fix for that out very soon!

Greetings,
Stephan



On Tue, Aug 18, 2015 at 12:23 PM, Robert Metzger <[hidden email]> wrote:
Java's HashMap is serializable.
If it is only the map, you can just use the HashMap<> as the state.

If you have more data, you can use TupleX, for example:

Tuple2<HashMap<Integer, String>, Long>(myMap, myLong);


On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)



Reply | Threaded
Open this post in threaded view
|

Re: Custom Class for state checkpointing

rmetzger0
We've finally merged the fix for the bug you've reported here (https://issues.apache.org/jira/browse/FLINK-2543).
You should now be able to use the file-based state handle with user classes as well.

Please let us know if you encounter more issues.

On Wed, Aug 19, 2015 at 10:20 AM, Rico Bergmann <[hidden email]> wrote:
Hi. 

Thanks for the tip. It seems to work...

Greets. 



Am 18.08.2015 um 13:56 schrieb Stephan Ewen <[hidden email]>:

Yep, that is a valid bug!
State is apparently not resolved with the correct classloader.

As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for that.

You can get the ClassLoader for the user code (needed for deserialization) via "getRuntimeContext().getUserCodeClassLoader()".

Let us know if that workaround works. We'll try to get a fix for that out very soon!

Greetings,
Stephan



On Tue, Aug 18, 2015 at 12:23 PM, Robert Metzger <[hidden email]> wrote:
Java's HashMap is serializable.
If it is only the map, you can just use the HashMap<> as the state.

If you have more data, you can use TupleX, for example:

Tuple2<HashMap<Integer, String>, Long>(myMap, myLong);


On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



Am 18.08.2015 um 11:44 schrieb Robert Metzger <[hidden email]>:

Hi Rico,

I'm pretty sure that this is a valid bug you've found, since this case is not yet tested (afaik).
We'll fix the issue asap, until then, are you able to encapsulate your state in something that is available in Flink, for example a TupleX or just serialize it yourself into a byte[] ?

On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <[hidden email]> wrote:
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
at org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
at org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)