load + update global state

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

load + update global state

Peter Ertl
Hi folks,

I am coding a streaming task that processes http requests from our web site and enriches these with additional information.

It contains session ids from historic requests and the related emails that were used within these session in the past.


    lookup - hashtable:     session_id: String => emails: Set[String]


During processing of these NEW http request

- the lookup table should be used to get previous emails and enrich the current stream item
- new candidates for the lookup table will be discovered during processing of these items and should be added to the lookup table (also these changes should be visible through the cluster)

I see at least the following issues:

(1) load the state as a whole from the data store into memory is a huge burn of memory (also making changes cluster-wide visible is an issue)

(2) not loading into memory but using something like cassandra / redis as a lookup store would certainly work but introduces a lot of network requests (possible ideas: use a distributed cache? broadcast updates in flink cluster?)

(3) how should I integrate the changes to the table with flink's checkpointing?

I really don't get how to solve this best and my current solution is far from elegant....

So is there any best practice for supporting "large lookup tables that change during stream processing" ?

Cheers
Peter




Reply | Threaded
Open this post in threaded view
|

Re: load + update global state

Tzu-Li (Gordon) Tai
Hi Peter!

One thing I’d like to understand first after reading about your use case:
Why exactly do you need the lookup table to be globally accessible? From what I understand, you are using this lookup table for stream event enriching, so whatever processing you need to perform downstream on this enriched stream, you would already have the corresponding information for each session attached.

Regarding a solution for efficient stream enriching in your case:
In your case, the enrichment data comes from the input events itself, so it can be fairly straightforward: use a MapFunction that keeps the lookup table as managed keyed state [1].
By using RocksDB as your state backend [2], the table would not be backed by memory and therefore your state size is only bounded by disk size. Each state access would be bound to the current processed key (i.e., in your case session id, meaning that you’d only be accessing the emails set of that session).
Using RocksDB as your state backend, each state access and update would require de-/serialization (of the state of a single key), but that would always be local access and in general would outperform remotely looking up an external store.

So, to wrap this up, the answers to your doubts, when using Flink, would be:

(1) load the state as a whole from the data store into memory is a huge burn of memory (also making changes cluster-wide visible is an issue) 

Apart from the “cluster-wide visibility” aspect which needs to be clarified, you can use RocksDB as the state backend to back the state and not keep the state in memory.

(2) not loading into memory but using something like cassandra / redis as a lookup store would certainly work but introduces a lot of network requests (possible ideas: use a distributed cache? broadcast updates in flink cluster?) 

Remote lookup is not required, if you keep the lookup store as managed keyed state in Flink. All session lookup would be local state access. You can think of it as you’re basically setting up a K-V store within Flink that is always co-partitioned by session id with your incoming events.

(3) how should I integrate the changes to the table with flink's checkpointing? 

Simply by registering managed keyed state. Flink will handle checkpointing that for fault tolerance for you, and ensuring exactly-once. The “Working with State" docs hopefully should cover that quite well!


Hope this helps :)

Cheers,
Gordon



On 8 August 2017 at 3:00:57 AM, Peter Ertl ([hidden email]) wrote:

Hi folks,

I am coding a streaming task that processes http requests from our web site and enriches these with additional information.

It contains session ids from historic requests and the related emails that were used within these session in the past.


lookup - hashtable: session_id: String => emails: Set[String]


During processing of these NEW http request

- the lookup table should be used to get previous emails and enrich the current stream item
- new candidates for the lookup table will be discovered during processing of these items and should be added to the lookup table (also these changes should be visible through the cluster)

I see at least the following issues:

(1) load the state as a whole from the data store into memory is a huge burn of memory (also making changes cluster-wide visible is an issue)

(2) not loading into memory but using something like cassandra / redis as a lookup store would certainly work but introduces a lot of network requests (possible ideas: use a distributed cache? broadcast updates in flink cluster?)

(3) how should I integrate the changes to the table with flink's checkpointing?

I really don't get how to solve this best and my current solution is far from elegant....

So is there any best practice for supporting "large lookup tables that change during stream processing" ?

Cheers
Peter