Is there a way to iterate over all of the key/value entries in the state backend, from within the operator that’s making use of the same?
E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer method) I need to iterate over all KV state and emit the N “best” entries. What’s the recommended approach? Thanks, — Ken |
Hi Ken,
Unfortunately, iterating over all keys is not currently supported. Do you have your own custom operator (because you mention “from within the operator…”) or you have a process function (because you mention the “onTimer” method)? Also, could you describe your use case a bit more? You have a periodic timer per key and when a timer for a given key fires you want to have access to the state of all the keys? Thanks, Kostas
|
Hi Kostas,
Thanks for responding. Details in-line below.
The main problems I’m trying to solve (without requiring a separate scalable DB infrastructure) are: - entries have an associated “earliest processing time”. I don’t want to send these through the system until that time trigger has passed. - entries have an associated “score”. I want to favor processing high scoring entries over low scoring entries. - if an entry’s score is too low, I want to archive it, versus constantly re-evaluate it using the above two factors. I’ve got my own custom DB that is working for the above, and scales to target sizes of 1B+ entries per server by using a mixture of RAM and disk. But having to checkpoint it isn’t trivial. So I thought that if there was a way to (occasionally) iterate over the keys in the state backend, I could get what I needed with the minimum effort. But sounds like that’s not possible currently. Thanks, — Ken
-------------------------- Ken Krugler +1 530-210-6378 custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
Hi Ken,
So you have a queue where elements are sorted by timestamp and score, and when the time (event time I suppose) passes that of the timestamp of an element, you want to fetch the element and: if the score is too low you archive it if the score is OK you emit it. If I get it right, then if your stream is keyed you have a queue and an “archive” state per key, if not, you have a global queue for all elements, which can be seen as a keyed stream on a dummy key, right? By the way, timers in Flink have to be associated with a key, so I suppose that if you are using timers you are in the first case (keyed stream). In this case, why do you need access to the state of all the keys? Also it may be worth having a look at the CEP operator in the Flink codebase. There you also have a queue per key, where events are sorted by timestamp, and at each watermark, elements with timestamps smaller than the watermark are processed. Hope this helps, Kostas
|
Hi Kostas,
In my use case I’m keeping track of the state of URLs during a web crawl. This represents both current state (“URL X should be crawled at time Y, and has an estimated value of Z), and is the source of URLs to be fed into the crawl infrastructure - it’s a floor wax and a dessert topping. Which is why it’s a process function, so that I can query this “crawlDB” to get URLs to emit to the fetch queue, independent of when/if new URLs are flowing in from some external source. And yes, I could use an external, queryable system to to handle this (e.g. Elasticsearch), but at a scale of billions of URLs having something custom is of significant value in terms of performance and resource costs. There are things I could do to better leverage Flink’s state management, so I have to do less in this custom DB (e.g. archiving low-scoring URLs comes to mind). But after a few whiteboard sessions, it still seems like I’m going to have to add checkpointing/snapshotting support to my custom crawlDB. Thanks, — Ken
-------------------------- Ken Krugler +1 530-210-6378 custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
Free forum by Nabble | Edit this page |