Hi, Flink newbie here.
I played with the API (built from GitHub master), I encountered some issues but I am not sure if they are limitations or actually by design: 1. the data stream reduce method does not take a RichReduceFunction. The code compiles but throws runtime exception when submitted. [My intent is to maintain a MapState, more below] 2. Flink seems to be picky on where the MapState is used at runtime. MapState is restricted to keyed stream, and cannot be used with certain operators. However I might need to maintain a MapState for certain (persistent) keyed state for processing contexts. [I could use an external kv store via async io API, but I am hoping Flink could help to maintain the (rocksdb) db instances so I could avoid another layer of external store]. Any pointer to blog/doc/video is greatly appreciated. Thanks! |
Hi,
you are right. There are some limitation about RichReduceFunctions on windows. Maybe the new AggregateFunction `window.aggregate()` could solve your problem, you can provide an accumulator which is your custom state that you can update for each record. I couldn't find a documentation page, it might be created in next weeks after the feature freeze. Regarding the MapState I loop in Stefan, maybe he can give you some advice here. Timo Am 26/04/17 um 04:25 schrieb Sand Stone: > Hi, Flink newbie here. > > I played with the API (built from GitHub master), I encountered some > issues but I am not sure if they are limitations or actually by > design: > 1. the data stream reduce method does not take a > RichReduceFunction. The code compiles but throws runtime exception > when submitted. [My intent is to maintain a MapState, more below] > > 2. Flink seems to be picky on where the MapState is used at > runtime. MapState is restricted to keyed stream, and cannot be used > with certain operators. However I might need to maintain a MapState > for certain (persistent) keyed state for processing contexts. [I could > use an external kv store via async io API, but I am hoping Flink could > help to maintain the (rocksdb) db instances so I could avoid another > layer of external store]. > > Any pointer to blog/doc/video is greatly appreciated. > > Thanks! |
To be clear, I like the direction of Flink is going with State:
Querytable State, MapState etc. MapState in particular is a great feature and I am trying to find more documentation and/or usage patterns with it before I dive into the deep end of the code. As far as I can tell, the key in MapState does not have to be associated with the key in keyed stream. So in theory, I should be able to use MapState almost anywhere that accepts "RichXXX" functions. Also, I wonder if it makes sense to have "global state" (stored in a rocksdb backend) to be instantiated by Env and maintained by JobManager. Sure the state access is RPC but the database lifetime is maintained by the Flink cluster. Right now I think I could use a "long running" job to expose a Queryable State to emulate this. Thanks! On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <[hidden email]> wrote: > Hi, > > you are right. There are some limitation about RichReduceFunctions on > windows. Maybe the new AggregateFunction `window.aggregate()` could solve > your problem, you can provide an accumulator which is your custom state that > you can update for each record. I couldn't find a documentation page, it > might be created in next weeks after the feature freeze. > > Regarding the MapState I loop in Stefan, maybe he can give you some advice > here. > > Timo > > > > > Am 26/04/17 um 04:25 schrieb Sand Stone: > >> Hi, Flink newbie here. >> >> I played with the API (built from GitHub master), I encountered some >> issues but I am not sure if they are limitations or actually by >> design: >> 1. the data stream reduce method does not take a >> RichReduceFunction. The code compiles but throws runtime exception >> when submitted. [My intent is to maintain a MapState, more below] >> >> 2. Flink seems to be picky on where the MapState is used at >> runtime. MapState is restricted to keyed stream, and cannot be used >> with certain operators. However I might need to maintain a MapState >> for certain (persistent) keyed state for processing contexts. [I could >> use an external kv store via async io API, but I am hoping Flink could >> help to maintain the (rocksdb) db instances so I could avoid another >> layer of external store]. >> >> Any pointer to blog/doc/video is greatly appreciated. >> >> Thanks! > > > |
Hi,
you can imagine the internals of keyed map state working like a Map<EventKey, Map<UserKey, Value>>, but you only deal with the Map<UserKey, Value> part in your user code. Under the hood, Flink will always present you the map that corresponds to the currently processed even’s key. So for each element, it will always swap in the inner map, basically doing lookup to the outer map by event key for you. For operator state (which is state that is not by key) there are currently no map states and also no implementation for RocksDB. We might introduce this in the future but until now, it was never really required because large state is typically by key. So what you can do is just maintaining e.g. a java.util.Map<UserKey, Value> yourself and write it to a ListState at checkpointing time. The list aspect in operator state is different from the keyed ListState: list elements build the atoms of state re-distribution (think you scaling in or out). So you could store your complete map as one list element, or each entry as one list element, or anything in between - depending on if and how your operator state can be re-sharded. You could take a look at FlinkKafkaConsumerBase::initializeState and FlinkKafkaConsumerBase::snapshotState as an example, where Kafka partition offsets are the operator state and individual offsets become list elements so that they can be individually redistributed. Best, Stefan > Am 26.04.2017 um 17:24 schrieb Sand Stone <[hidden email]>: > > To be clear, I like the direction of Flink is going with State: > Querytable State, MapState etc. MapState in particular is a great > feature and I am trying to find more documentation and/or usage > patterns with it before I dive into the deep end of the code. As far > as I can tell, the key in MapState does not have to be associated with > the key in keyed stream. So in theory, I should be able to use > MapState almost anywhere that accepts "RichXXX" functions. > > Also, I wonder if it makes sense to have "global state" (stored in a > rocksdb backend) to be instantiated by Env and maintained by > JobManager. Sure the state access is RPC but the database lifetime is > maintained by the Flink cluster. Right now I think I could use a "long > running" job to expose a Queryable State to emulate this. > > Thanks! > > > > On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <[hidden email]> wrote: >> Hi, >> >> you are right. There are some limitation about RichReduceFunctions on >> windows. Maybe the new AggregateFunction `window.aggregate()` could solve >> your problem, you can provide an accumulator which is your custom state that >> you can update for each record. I couldn't find a documentation page, it >> might be created in next weeks after the feature freeze. >> >> Regarding the MapState I loop in Stefan, maybe he can give you some advice >> here. >> >> Timo >> >> >> >> >> Am 26/04/17 um 04:25 schrieb Sand Stone: >> >>> Hi, Flink newbie here. >>> >>> I played with the API (built from GitHub master), I encountered some >>> issues but I am not sure if they are limitations or actually by >>> design: >>> 1. the data stream reduce method does not take a >>> RichReduceFunction. The code compiles but throws runtime exception >>> when submitted. [My intent is to maintain a MapState, more below] >>> >>> 2. Flink seems to be picky on where the MapState is used at >>> runtime. MapState is restricted to keyed stream, and cannot be used >>> with certain operators. However I might need to maintain a MapState >>> for certain (persistent) keyed state for processing contexts. [I could >>> use an external kv store via async io API, but I am hoping Flink could >>> help to maintain the (rocksdb) db instances so I could avoid another >>> layer of external store]. >>> >>> Any pointer to blog/doc/video is greatly appreciated. >>> >>> Thanks! >> >> >> |
Thanks Stefan. The logical data model of Map<EventKey, Map<UserKey,
Value>> makes total sense. A related question, the MapState supports iterate. What's the encoding format at the RocksDB layer? Or rather how a user could control the user key encoding? I assume the implementation uses a compound key format: EventKeyUserKey. Let's assume UserKey is an int or long. If using big endian, the iterate will return UserKey in order as stored in the RocksDB. Thanks! On Thu, Apr 27, 2017 at 6:34 AM, Stefan Richter <[hidden email]> wrote: > Hi, > > you can imagine the internals of keyed map state working like a Map<EventKey, Map<UserKey, Value>>, but you only deal with the Map<UserKey, Value> part in your user code. Under the hood, Flink will always present you the map that corresponds to the currently processed even’s key. So for each element, it will always swap in the inner map, basically doing lookup to the outer map by event key for you. > > For operator state (which is state that is not by key) there are currently no map states and also no implementation for RocksDB. We might introduce this in the future but until now, it was never really required because large state is typically by key. So what you can do is just maintaining e.g. a java.util.Map<UserKey, Value> yourself and write it to a ListState at checkpointing time. The list aspect in operator state is different from the keyed ListState: list elements build the atoms of state re-distribution (think you scaling in or out). So you could store your complete map as one list element, or each entry as one list element, or anything in between - depending on if and how your operator state can be re-sharded. You could take a look at FlinkKafkaConsumerBase::initializeState and FlinkKafkaConsumerBase::snapshotState as an example, where Kafka partition offsets are the operator state and individual offsets become list elements so that they can be individually redistributed. > > Best, > Stefan > > >> Am 26.04.2017 um 17:24 schrieb Sand Stone <[hidden email]>: >> >> To be clear, I like the direction of Flink is going with State: >> Querytable State, MapState etc. MapState in particular is a great >> feature and I am trying to find more documentation and/or usage >> patterns with it before I dive into the deep end of the code. As far >> as I can tell, the key in MapState does not have to be associated with >> the key in keyed stream. So in theory, I should be able to use >> MapState almost anywhere that accepts "RichXXX" functions. >> >> Also, I wonder if it makes sense to have "global state" (stored in a >> rocksdb backend) to be instantiated by Env and maintained by >> JobManager. Sure the state access is RPC but the database lifetime is >> maintained by the Flink cluster. Right now I think I could use a "long >> running" job to expose a Queryable State to emulate this. >> >> Thanks! >> >> >> >> On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <[hidden email]> wrote: >>> Hi, >>> >>> you are right. There are some limitation about RichReduceFunctions on >>> windows. Maybe the new AggregateFunction `window.aggregate()` could solve >>> your problem, you can provide an accumulator which is your custom state that >>> you can update for each record. I couldn't find a documentation page, it >>> might be created in next weeks after the feature freeze. >>> >>> Regarding the MapState I loop in Stefan, maybe he can give you some advice >>> here. >>> >>> Timo >>> >>> >>> >>> >>> Am 26/04/17 um 04:25 schrieb Sand Stone: >>> >>>> Hi, Flink newbie here. >>>> >>>> I played with the API (built from GitHub master), I encountered some >>>> issues but I am not sure if they are limitations or actually by >>>> design: >>>> 1. the data stream reduce method does not take a >>>> RichReduceFunction. The code compiles but throws runtime exception >>>> when submitted. [My intent is to maintain a MapState, more below] >>>> >>>> 2. Flink seems to be picky on where the MapState is used at >>>> runtime. MapState is restricted to keyed stream, and cannot be used >>>> with certain operators. However I might need to maintain a MapState >>>> for certain (persistent) keyed state for processing contexts. [I could >>>> use an external kv store via async io API, but I am hoping Flink could >>>> help to maintain the (rocksdb) db instances so I could avoid another >>>> layer of external store]. >>>> >>>> Any pointer to blog/doc/video is greatly appreciated. >>>> >>>> Thanks! >>> >>> >>> > |
The userkey and value coding is controlled through serializer udfs that can be user provided. Your assumption is right, RocksDB work like an ordered map and we concatenate the actual keys as (keygroup_id(think of a shard id that is functionally dependent on the element key’s hash to group keys in shards) | elementkey | userkey ) mapping to the user value.
> Am 27.04.2017 um 16:57 schrieb Sand Stone <[hidden email]>: > > Thanks Stefan. The logical data model of Map<EventKey, Map<UserKey, > Value>> makes total sense. A related question, the MapState supports > iterate. What's the encoding format at the RocksDB layer? Or rather > how a user could control the user key encoding? > > I assume the implementation uses a compound key format: > EventKeyUserKey. Let's assume UserKey is an int or long. If using big > endian, the iterate will return UserKey in order as stored in the > RocksDB. > > Thanks! > > > On Thu, Apr 27, 2017 at 6:34 AM, Stefan Richter > <[hidden email]> wrote: >> Hi, >> >> you can imagine the internals of keyed map state working like a Map<EventKey, Map<UserKey, Value>>, but you only deal with the Map<UserKey, Value> part in your user code. Under the hood, Flink will always present you the map that corresponds to the currently processed even’s key. So for each element, it will always swap in the inner map, basically doing lookup to the outer map by event key for you. >> >> For operator state (which is state that is not by key) there are currently no map states and also no implementation for RocksDB. We might introduce this in the future but until now, it was never really required because large state is typically by key. So what you can do is just maintaining e.g. a java.util.Map<UserKey, Value> yourself and write it to a ListState at checkpointing time. The list aspect in operator state is different from the keyed ListState: list elements build the atoms of state re-distribution (think you scaling in or out). So you could store your complete map as one list element, or each entry as one list element, or anything in between - depending on if and how your operator state can be re-sharded. You could take a look at FlinkKafkaConsumerBase::initializeState and FlinkKafkaConsumerBase::snapshotState as an example, where Kafka partition offsets are the operator state and individual offsets become list elements so that they can be individually redistributed. >> >> Best, >> Stefan >> >> >>> Am 26.04.2017 um 17:24 schrieb Sand Stone <[hidden email]>: >>> >>> To be clear, I like the direction of Flink is going with State: >>> Querytable State, MapState etc. MapState in particular is a great >>> feature and I am trying to find more documentation and/or usage >>> patterns with it before I dive into the deep end of the code. As far >>> as I can tell, the key in MapState does not have to be associated with >>> the key in keyed stream. So in theory, I should be able to use >>> MapState almost anywhere that accepts "RichXXX" functions. >>> >>> Also, I wonder if it makes sense to have "global state" (stored in a >>> rocksdb backend) to be instantiated by Env and maintained by >>> JobManager. Sure the state access is RPC but the database lifetime is >>> maintained by the Flink cluster. Right now I think I could use a "long >>> running" job to expose a Queryable State to emulate this. >>> >>> Thanks! >>> >>> >>> >>> On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <[hidden email]> wrote: >>>> Hi, >>>> >>>> you are right. There are some limitation about RichReduceFunctions on >>>> windows. Maybe the new AggregateFunction `window.aggregate()` could solve >>>> your problem, you can provide an accumulator which is your custom state that >>>> you can update for each record. I couldn't find a documentation page, it >>>> might be created in next weeks after the feature freeze. >>>> >>>> Regarding the MapState I loop in Stefan, maybe he can give you some advice >>>> here. >>>> >>>> Timo >>>> >>>> >>>> >>>> >>>> Am 26/04/17 um 04:25 schrieb Sand Stone: >>>> >>>>> Hi, Flink newbie here. >>>>> >>>>> I played with the API (built from GitHub master), I encountered some >>>>> issues but I am not sure if they are limitations or actually by >>>>> design: >>>>> 1. the data stream reduce method does not take a >>>>> RichReduceFunction. The code compiles but throws runtime exception >>>>> when submitted. [My intent is to maintain a MapState, more below] >>>>> >>>>> 2. Flink seems to be picky on where the MapState is used at >>>>> runtime. MapState is restricted to keyed stream, and cannot be used >>>>> with certain operators. However I might need to maintain a MapState >>>>> for certain (persistent) keyed state for processing contexts. [I could >>>>> use an external kv store via async io API, but I am hoping Flink could >>>>> help to maintain the (rocksdb) db instances so I could avoid another >>>>> layer of external store]. >>>>> >>>>> Any pointer to blog/doc/video is greatly appreciated. >>>>> >>>>> Thanks! >>>> >>>> >>>> >> |
Free forum by Nabble | Edit this page |