Accessing StateBackend snapshots outside of Flink

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Accessing StateBackend snapshots outside of Flink

igor.berman
Hi,
we are evaluating Flink for new solution and several people raised concern of coupling too much to Flink -
1. we understand that if we want to get full fault tolerance and best performance we'll need to use Flink managed state(probably RocksDB backend due to volume of state)
2. but then if we latter find that Flink doesn't answer our needs(for any reason) - we'll need to extract this state in some way(since it's the only source of consistent state)
In general I'd like to be able to take snapshot of backend and try to read it...do you think it's will be trivial task?
say If I'm holding list state per partitioned key, would it be easy to take RocksDb file and open it?

any thoughts regarding how can I convince people in our team?

thanks in advance!
Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Aljoscha Krettek
Hi,
for RocksDB we simply use a TypeSerializer to serialize the key and value to a byte[] array and store that in RocksDB. For a ListState, we serialize the individual elements using a TypeSerializer and store them in a comma-separated list in RocksDB. The snapshots of RocksDB that we write to HDFS are regular backups of a RocksDB database, as described here: https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You should be possible to read them from HDFS and restore them to a RocksDB data base as described in the linked documentation.

tl;dr As long as you know the type of values stored in the state you should be able to read them from RocksDB and deserialize the values using TypeSerializer.

One more bit of information: Internally the state is keyed by (key, namespace) -> value where namespace can be an arbitrary type that has a TypeSerializer. We use this to store window state that is both local to key and the current window. For state that you store in a user-defined function the namespace will always be null and that will be serialized by a VoidSerializer that simply always writes a "0" byte.

Cheers,
Aljoscha

On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]> wrote:
Hi,
we are evaluating Flink for new solution and several people raised concern
of coupling too much to Flink -
1. we understand that if we want to get full fault tolerance and best
performance we'll need to use Flink managed state(probably RocksDB backend
due to volume of state)
2. but then if we latter find that Flink doesn't answer our needs(for any
reason) - we'll need to extract this state in some way(since it's the only
source of consistent state)
In general I'd like to be able to take snapshot of backend and try to read
it...do you think it's will be trivial task?
say If I'm holding list state per partitioned key, would it be easy to take
RocksDb file and open it?

any thoughts regarding how can I convince people in our team?

thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Stephan Ewen
One thing to add is that you can always trigger a persistent checkpoint via the "savepoints" feature: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html



On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
for RocksDB we simply use a TypeSerializer to serialize the key and value to a byte[] array and store that in RocksDB. For a ListState, we serialize the individual elements using a TypeSerializer and store them in a comma-separated list in RocksDB. The snapshots of RocksDB that we write to HDFS are regular backups of a RocksDB database, as described here: https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You should be possible to read them from HDFS and restore them to a RocksDB data base as described in the linked documentation.

tl;dr As long as you know the type of values stored in the state you should be able to read them from RocksDB and deserialize the values using TypeSerializer.

One more bit of information: Internally the state is keyed by (key, namespace) -> value where namespace can be an arbitrary type that has a TypeSerializer. We use this to store window state that is both local to key and the current window. For state that you store in a user-defined function the namespace will always be null and that will be serialized by a VoidSerializer that simply always writes a "0" byte.

Cheers,
Aljoscha

On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]> wrote:
Hi,
we are evaluating Flink for new solution and several people raised concern
of coupling too much to Flink -
1. we understand that if we want to get full fault tolerance and best
performance we'll need to use Flink managed state(probably RocksDB backend
due to volume of state)
2. but then if we latter find that Flink doesn't answer our needs(for any
reason) - we'll need to extract this state in some way(since it's the only
source of consistent state)
In general I'd like to be able to take snapshot of backend and try to read
it...do you think it's will be trivial task?
say If I'm holding list state per partitioned key, would it be easy to take
RocksDb file and open it?

any thoughts regarding how can I convince people in our team?

thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

igor.berman
thanks a lot for the info, seems not too complex
I'll try to write simple tool to read this state.

Aljoscha, does the key reflects unique id of operator in some way? Or key is just a "name" that passed to ValueStateDescriptor.

thanks in advance


On 15 April 2016 at 15:10, Stephan Ewen <[hidden email]> wrote:
One thing to add is that you can always trigger a persistent checkpoint via the "savepoints" feature: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html



On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
for RocksDB we simply use a TypeSerializer to serialize the key and value to a byte[] array and store that in RocksDB. For a ListState, we serialize the individual elements using a TypeSerializer and store them in a comma-separated list in RocksDB. The snapshots of RocksDB that we write to HDFS are regular backups of a RocksDB database, as described here: https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You should be possible to read them from HDFS and restore them to a RocksDB data base as described in the linked documentation.

tl;dr As long as you know the type of values stored in the state you should be able to read them from RocksDB and deserialize the values using TypeSerializer.

One more bit of information: Internally the state is keyed by (key, namespace) -> value where namespace can be an arbitrary type that has a TypeSerializer. We use this to store window state that is both local to key and the current window. For state that you store in a user-defined function the namespace will always be null and that will be serialized by a VoidSerializer that simply always writes a "0" byte.

Cheers,
Aljoscha

On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]> wrote:
Hi,
we are evaluating Flink for new solution and several people raised concern
of coupling too much to Flink -
1. we understand that if we want to get full fault tolerance and best
performance we'll need to use Flink managed state(probably RocksDB backend
due to volume of state)
2. but then if we latter find that Flink doesn't answer our needs(for any
reason) - we'll need to extract this state in some way(since it's the only
source of consistent state)
In general I'd like to be able to take snapshot of backend and try to read
it...do you think it's will be trivial task?
say If I'm holding list state per partitioned key, would it be easy to take
RocksDb file and open it?

any thoughts regarding how can I convince people in our team?

thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Aljoscha Krettek
Hi,
key refers to the key extracted by your KeySelector. Right now, for every named state (i.e. the name in the StateDescriptor) there is a an isolated RocksDB instance.

Cheers,
Aljoscha

On Sat, 16 Apr 2016 at 15:43 Igor Berman <[hidden email]> wrote:
thanks a lot for the info, seems not too complex
I'll try to write simple tool to read this state.

Aljoscha, does the key reflects unique id of operator in some way? Or key is just a "name" that passed to ValueStateDescriptor.

thanks in advance


On 15 April 2016 at 15:10, Stephan Ewen <[hidden email]> wrote:
One thing to add is that you can always trigger a persistent checkpoint via the "savepoints" feature: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html



On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
for RocksDB we simply use a TypeSerializer to serialize the key and value to a byte[] array and store that in RocksDB. For a ListState, we serialize the individual elements using a TypeSerializer and store them in a comma-separated list in RocksDB. The snapshots of RocksDB that we write to HDFS are regular backups of a RocksDB database, as described here: https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You should be possible to read them from HDFS and restore them to a RocksDB data base as described in the linked documentation.

tl;dr As long as you know the type of values stored in the state you should be able to read them from RocksDB and deserialize the values using TypeSerializer.

One more bit of information: Internally the state is keyed by (key, namespace) -> value where namespace can be an arbitrary type that has a TypeSerializer. We use this to store window state that is both local to key and the current window. For state that you store in a user-defined function the namespace will always be null and that will be serialized by a VoidSerializer that simply always writes a "0" byte.

Cheers,
Aljoscha

On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]> wrote:
Hi,
we are evaluating Flink for new solution and several people raised concern
of coupling too much to Flink -
1. we understand that if we want to get full fault tolerance and best
performance we'll need to use Flink managed state(probably RocksDB backend
due to volume of state)
2. but then if we latter find that Flink doesn't answer our needs(for any
reason) - we'll need to extract this state in some way(since it's the only
source of consistent state)
In general I'd like to be able to take snapshot of backend and try to read
it...do you think it's will be trivial task?
say If I'm holding list state per partitioned key, would it be easy to take
RocksDb file and open it?

any thoughts regarding how can I convince people in our team?

thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Josh
Hello, 
I have a follow-up question to this: since Flink doesn't support state expiration at the moment (e.g. expiring state which hasn't been updated for a certain amount of time), would it be possible to clear up old UDF states by:
- store a 'last_updated" timestamp in the state value
- periodically (e.g. monthly) go through all the state values in RocksDB, deserialize them using TypeSerializer and read the "last_updated" property
- delete the key from RocksDB if the state's "last_updated" property is over a month ago

Is there any reason this approach wouldn't work, or anything to be careful of?

Thanks,
Josh


On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
key refers to the key extracted by your KeySelector. Right now, for every named state (i.e. the name in the StateDescriptor) there is a an isolated RocksDB instance.

Cheers,
Aljoscha

On Sat, 16 Apr 2016 at 15:43 Igor Berman <[hidden email]> wrote:
thanks a lot for the info, seems not too complex
I'll try to write simple tool to read this state.

Aljoscha, does the key reflects unique id of operator in some way? Or key is just a "name" that passed to ValueStateDescriptor.

thanks in advance


On 15 April 2016 at 15:10, Stephan Ewen <[hidden email]> wrote:
One thing to add is that you can always trigger a persistent checkpoint via the "savepoints" feature: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html



On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
for RocksDB we simply use a TypeSerializer to serialize the key and value to a byte[] array and store that in RocksDB. For a ListState, we serialize the individual elements using a TypeSerializer and store them in a comma-separated list in RocksDB. The snapshots of RocksDB that we write to HDFS are regular backups of a RocksDB database, as described here: https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You should be possible to read them from HDFS and restore them to a RocksDB data base as described in the linked documentation.

tl;dr As long as you know the type of values stored in the state you should be able to read them from RocksDB and deserialize the values using TypeSerializer.

One more bit of information: Internally the state is keyed by (key, namespace) -> value where namespace can be an arbitrary type that has a TypeSerializer. We use this to store window state that is both local to key and the current window. For state that you store in a user-defined function the namespace will always be null and that will be serialized by a VoidSerializer that simply always writes a "0" byte.

Cheers,
Aljoscha

On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]> wrote:
Hi,
we are evaluating Flink for new solution and several people raised concern
of coupling too much to Flink -
1. we understand that if we want to get full fault tolerance and best
performance we'll need to use Flink managed state(probably RocksDB backend
due to volume of state)
2. but then if we latter find that Flink doesn't answer our needs(for any
reason) - we'll need to extract this state in some way(since it's the only
source of consistent state)
In general I'd like to be able to take snapshot of backend and try to read
it...do you think it's will be trivial task?
say If I'm holding list state per partitioned key, would it be easy to take
RocksDb file and open it?

any thoughts regarding how can I convince people in our team?

thanks in advance!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Maximilian Michels
Hi Josh,

I'm not a RocksDB expert but the workaround you described should work.
Just bear in mind that accessing RocksDB concurrently with a Flink job
can result in an inconsistent state. Make sure to perform atomic
updates and clear the RocksDB cache for the item.

Cheers,
Max

On Mon, Jun 13, 2016 at 10:14 AM, Josh <[hidden email]> wrote:

> Hello,
> I have a follow-up question to this: since Flink doesn't support state
> expiration at the moment (e.g. expiring state which hasn't been updated for
> a certain amount of time), would it be possible to clear up old UDF states
> by:
> - store a 'last_updated" timestamp in the state value
> - periodically (e.g. monthly) go through all the state values in RocksDB,
> deserialize them using TypeSerializer and read the "last_updated" property
> - delete the key from RocksDB if the state's "last_updated" property is over
> a month ago
>
> Is there any reason this approach wouldn't work, or anything to be careful
> of?
>
> Thanks,
> Josh
>
>
> On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Hi,
>> key refers to the key extracted by your KeySelector. Right now, for every
>> named state (i.e. the name in the StateDescriptor) there is a an isolated
>> RocksDB instance.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sat, 16 Apr 2016 at 15:43 Igor Berman <[hidden email]> wrote:
>>>
>>> thanks a lot for the info, seems not too complex
>>> I'll try to write simple tool to read this state.
>>>
>>> Aljoscha, does the key reflects unique id of operator in some way? Or key
>>> is just a "name" that passed to ValueStateDescriptor.
>>>
>>> thanks in advance
>>>
>>>
>>> On 15 April 2016 at 15:10, Stephan Ewen <[hidden email]> wrote:
>>>>
>>>> One thing to add is that you can always trigger a persistent checkpoint
>>>> via the "savepoints" feature:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>>>
>>>>
>>>>
>>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi,
>>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>>>>> value to a byte[] array and store that in RocksDB. For a ListState, we
>>>>> serialize the individual elements using a TypeSerializer and store them in a
>>>>> comma-separated list in RocksDB. The snapshots of RocksDB that we write to
>>>>> HDFS are regular backups of a RocksDB database, as described here:
>>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
>>>>> should be possible to read them from HDFS and restore them to a RocksDB data
>>>>> base as described in the linked documentation.
>>>>>
>>>>> tl;dr As long as you know the type of values stored in the state you
>>>>> should be able to read them from RocksDB and deserialize the values using
>>>>> TypeSerializer.
>>>>>
>>>>> One more bit of information: Internally the state is keyed by (key,
>>>>> namespace) -> value where namespace can be an arbitrary type that has a
>>>>> TypeSerializer. We use this to store window state that is both local to key
>>>>> and the current window. For state that you store in a user-defined function
>>>>> the namespace will always be null and that will be serialized by a
>>>>> VoidSerializer that simply always writes a "0" byte.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> we are evaluating Flink for new solution and several people raised
>>>>>> concern
>>>>>> of coupling too much to Flink -
>>>>>> 1. we understand that if we want to get full fault tolerance and best
>>>>>> performance we'll need to use Flink managed state(probably RocksDB
>>>>>> backend
>>>>>> due to volume of state)
>>>>>> 2. but then if we latter find that Flink doesn't answer our needs(for
>>>>>> any
>>>>>> reason) - we'll need to extract this state in some way(since it's the
>>>>>> only
>>>>>> source of consistent state)
>>>>>> In general I'd like to be able to take snapshot of backend and try to
>>>>>> read
>>>>>> it...do you think it's will be trivial task?
>>>>>> say If I'm holding list state per partitioned key, would it be easy to
>>>>>> take
>>>>>> RocksDb file and open it?
>>>>>>
>>>>>> any thoughts regarding how can I convince people in our team?
>>>>>>
>>>>>> thanks in advance!
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive at Nabble.com.
>>>>
>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Aljoscha Krettek
Hi Josh,
I think RocksDB does not allow accessing a data base instance from more than one process concurrently. Even if it were possible I would highly recommend not to fiddle with Flink state internals (in RocksDB or elsewhere) from the outside. All kinds of things might be going on at any given moment, such as: locking of state due to checkpoint, state restore after failure and simple state access.

If you are interested in this we can work together on adding proper support for TTL (time-to-live) to the Flink state abstraction.

Cheers,
Aljoscha

On Mon, 13 Jun 2016 at 12:21 Maximilian Michels <[hidden email]> wrote:
Hi Josh,

I'm not a RocksDB expert but the workaround you described should work.
Just bear in mind that accessing RocksDB concurrently with a Flink job
can result in an inconsistent state. Make sure to perform atomic
updates and clear the RocksDB cache for the item.

Cheers,
Max

On Mon, Jun 13, 2016 at 10:14 AM, Josh <[hidden email]> wrote:
> Hello,
> I have a follow-up question to this: since Flink doesn't support state
> expiration at the moment (e.g. expiring state which hasn't been updated for
> a certain amount of time), would it be possible to clear up old UDF states
> by:
> - store a 'last_updated" timestamp in the state value
> - periodically (e.g. monthly) go through all the state values in RocksDB,
> deserialize them using TypeSerializer and read the "last_updated" property
> - delete the key from RocksDB if the state's "last_updated" property is over
> a month ago
>
> Is there any reason this approach wouldn't work, or anything to be careful
> of?
>
> Thanks,
> Josh
>
>
> On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>>
>> Hi,
>> key refers to the key extracted by your KeySelector. Right now, for every
>> named state (i.e. the name in the StateDescriptor) there is a an isolated
>> RocksDB instance.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sat, 16 Apr 2016 at 15:43 Igor Berman <[hidden email]> wrote:
>>>
>>> thanks a lot for the info, seems not too complex
>>> I'll try to write simple tool to read this state.
>>>
>>> Aljoscha, does the key reflects unique id of operator in some way? Or key
>>> is just a "name" that passed to ValueStateDescriptor.
>>>
>>> thanks in advance
>>>
>>>
>>> On 15 April 2016 at 15:10, Stephan Ewen <[hidden email]> wrote:
>>>>
>>>> One thing to add is that you can always trigger a persistent checkpoint
>>>> via the "savepoints" feature:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>>>
>>>>
>>>>
>>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi,
>>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>>>>> value to a byte[] array and store that in RocksDB. For a ListState, we
>>>>> serialize the individual elements using a TypeSerializer and store them in a
>>>>> comma-separated list in RocksDB. The snapshots of RocksDB that we write to
>>>>> HDFS are regular backups of a RocksDB database, as described here:
>>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
>>>>> should be possible to read them from HDFS and restore them to a RocksDB data
>>>>> base as described in the linked documentation.
>>>>>
>>>>> tl;dr As long as you know the type of values stored in the state you
>>>>> should be able to read them from RocksDB and deserialize the values using
>>>>> TypeSerializer.
>>>>>
>>>>> One more bit of information: Internally the state is keyed by (key,
>>>>> namespace) -> value where namespace can be an arbitrary type that has a
>>>>> TypeSerializer. We use this to store window state that is both local to key
>>>>> and the current window. For state that you store in a user-defined function
>>>>> the namespace will always be null and that will be serialized by a
>>>>> VoidSerializer that simply always writes a "0" byte.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> we are evaluating Flink for new solution and several people raised
>>>>>> concern
>>>>>> of coupling too much to Flink -
>>>>>> 1. we understand that if we want to get full fault tolerance and best
>>>>>> performance we'll need to use Flink managed state(probably RocksDB
>>>>>> backend
>>>>>> due to volume of state)
>>>>>> 2. but then if we latter find that Flink doesn't answer our needs(for
>>>>>> any
>>>>>> reason) - we'll need to extract this state in some way(since it's the
>>>>>> only
>>>>>> source of consistent state)
>>>>>> In general I'd like to be able to take snapshot of backend and try to
>>>>>> read
>>>>>> it...do you think it's will be trivial task?
>>>>>> say If I'm holding list state per partitioned key, would it be easy to
>>>>>> take
>>>>>> RocksDb file and open it?
>>>>>>
>>>>>> any thoughts regarding how can I convince people in our team?
>>>>>>
>>>>>> thanks in advance!
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive at Nabble.com.
>>>>
>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Maximilian Michels
+1 to what Aljoscha said. We should rather fix this programmatically.

On Mon, Jun 13, 2016 at 4:25 PM, Aljoscha Krettek <[hidden email]> wrote:

> Hi Josh,
> I think RocksDB does not allow accessing a data base instance from more than
> one process concurrently. Even if it were possible I would highly recommend
> not to fiddle with Flink state internals (in RocksDB or elsewhere) from the
> outside. All kinds of things might be going on at any given moment, such as:
> locking of state due to checkpoint, state restore after failure and simple
> state access.
>
> If you are interested in this we can work together on adding proper support
> for TTL (time-to-live) to the Flink state abstraction.
>
> Cheers,
> Aljoscha
>
> On Mon, 13 Jun 2016 at 12:21 Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Josh,
>>
>> I'm not a RocksDB expert but the workaround you described should work.
>> Just bear in mind that accessing RocksDB concurrently with a Flink job
>> can result in an inconsistent state. Make sure to perform atomic
>> updates and clear the RocksDB cache for the item.
>>
>> Cheers,
>> Max
>>
>> On Mon, Jun 13, 2016 at 10:14 AM, Josh <[hidden email]> wrote:
>> > Hello,
>> > I have a follow-up question to this: since Flink doesn't support state
>> > expiration at the moment (e.g. expiring state which hasn't been updated
>> > for
>> > a certain amount of time), would it be possible to clear up old UDF
>> > states
>> > by:
>> > - store a 'last_updated" timestamp in the state value
>> > - periodically (e.g. monthly) go through all the state values in
>> > RocksDB,
>> > deserialize them using TypeSerializer and read the "last_updated"
>> > property
>> > - delete the key from RocksDB if the state's "last_updated" property is
>> > over
>> > a month ago
>> >
>> > Is there any reason this approach wouldn't work, or anything to be
>> > careful
>> > of?
>> >
>> > Thanks,
>> > Josh
>> >
>> >
>> > On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <[hidden email]>
>> > wrote:
>> >>
>> >> Hi,
>> >> key refers to the key extracted by your KeySelector. Right now, for
>> >> every
>> >> named state (i.e. the name in the StateDescriptor) there is a an
>> >> isolated
>> >> RocksDB instance.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Sat, 16 Apr 2016 at 15:43 Igor Berman <[hidden email]> wrote:
>> >>>
>> >>> thanks a lot for the info, seems not too complex
>> >>> I'll try to write simple tool to read this state.
>> >>>
>> >>> Aljoscha, does the key reflects unique id of operator in some way? Or
>> >>> key
>> >>> is just a "name" that passed to ValueStateDescriptor.
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>> On 15 April 2016 at 15:10, Stephan Ewen <[hidden email]> wrote:
>> >>>>
>> >>>> One thing to add is that you can always trigger a persistent
>> >>>> checkpoint
>> >>>> via the "savepoints" feature:
>> >>>>
>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek
>> >>>> <[hidden email]>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>> >>>>> value to a byte[] array and store that in RocksDB. For a ListState,
>> >>>>> we
>> >>>>> serialize the individual elements using a TypeSerializer and store
>> >>>>> them in a
>> >>>>> comma-separated list in RocksDB. The snapshots of RocksDB that we
>> >>>>> write to
>> >>>>> HDFS are regular backups of a RocksDB database, as described here:
>> >>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F.
>> >>>>> You
>> >>>>> should be possible to read them from HDFS and restore them to a
>> >>>>> RocksDB data
>> >>>>> base as described in the linked documentation.
>> >>>>>
>> >>>>> tl;dr As long as you know the type of values stored in the state you
>> >>>>> should be able to read them from RocksDB and deserialize the values
>> >>>>> using
>> >>>>> TypeSerializer.
>> >>>>>
>> >>>>> One more bit of information: Internally the state is keyed by (key,
>> >>>>> namespace) -> value where namespace can be an arbitrary type that
>> >>>>> has a
>> >>>>> TypeSerializer. We use this to store window state that is both local
>> >>>>> to key
>> >>>>> and the current window. For state that you store in a user-defined
>> >>>>> function
>> >>>>> the namespace will always be null and that will be serialized by a
>> >>>>> VoidSerializer that simply always writes a "0" byte.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Aljoscha
>> >>>>>
>> >>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <[hidden email]>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Hi,
>> >>>>>> we are evaluating Flink for new solution and several people raised
>> >>>>>> concern
>> >>>>>> of coupling too much to Flink -
>> >>>>>> 1. we understand that if we want to get full fault tolerance and
>> >>>>>> best
>> >>>>>> performance we'll need to use Flink managed state(probably RocksDB
>> >>>>>> backend
>> >>>>>> due to volume of state)
>> >>>>>> 2. but then if we latter find that Flink doesn't answer our
>> >>>>>> needs(for
>> >>>>>> any
>> >>>>>> reason) - we'll need to extract this state in some way(since it's
>> >>>>>> the
>> >>>>>> only
>> >>>>>> source of consistent state)
>> >>>>>> In general I'd like to be able to take snapshot of backend and try
>> >>>>>> to
>> >>>>>> read
>> >>>>>> it...do you think it's will be trivial task?
>> >>>>>> say If I'm holding list state per partitioned key, would it be easy
>> >>>>>> to
>> >>>>>> take
>> >>>>>> RocksDb file and open it?
>> >>>>>>
>> >>>>>> any thoughts regarding how can I convince people in our team?
>> >>>>>>
>> >>>>>> thanks in advance!
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> View this message in context:
>> >>>>>>
>> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>>>>> archive at Nabble.com.
>> >>>>
>> >>>>
>> >>>
>> >
Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

bwong247
We're currently investigating Flink, and one of the features that we'd like to have is a TTL feature to time out older values in state.  I saw this thread and it sounds like the functionality was being considered.  Is there any update?

 
Reply | Threaded
Open this post in threaded view
|

Re: Accessing StateBackend snapshots outside of Flink

Aljoscha Krettek
Hi,
there are two open issues about this:

no work was done on this yet. You can, however, simulate TTL for state by using a TimelyFlatMapFunction and manually setting a timer for clearing out state. (available in Flink 1.2-SNAPSHOT).

Cheers,
Aljoscha

On Thu, 3 Nov 2016 at 01:30 bwong247 <[hidden email]> wrote:
We're currently investigating Flink, and one of the features that we'd like
to have is a TTL feature to time out older values in state.  I saw this
thread and it sounds like the functionality was being considered.  Is there
any update?





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116p9846.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.