Re: How to perform this join operation?

Posted by Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-perform-this-join-operation-tp6088p6676.html

Hi Elias,
Samza and Flink operate at slightly different abstraction layers here. The Samza KeyValueStore basically has the interface of a Java HashMap. When accessing keyed state the key is always explicit in the method access and it only allows a simple put and get per key. Flink State, such as ValueState, ListState and ReducingState is implicitly scoped to the key of the input element and it allows different "shapes" of state, for example the ListState uses the efficient merge operation to add to the state instead of a get-update-put cycle that would be required in Samza KeyValueStore.

In code, this is what Samza does:

class Operator {
   KeyValueStore<K, V> store = ...
   void process(KV<Key, Value> element) {
      value = store.get(element.getKey())
      ...
      store.put(element.getKey(), ...)
   }
}

while this is what Flink does:

class Operator {
   ValueState<V> state = ...
   void internalProcess(KV<Key, Value> element) {
     state.setKey(element.getKey())
     process(element)
   }

   void process(KV<Key, Value> element) {
      value = state.get()
      ...
      state.update(...)
   }
}

In Flink we are dealing with the keys internally, which makes it easier for us to implement things like automatic scaling with rebalancing of keyed state (Till is working on this right now). Underneath we have something similar to the KeyValueStore, if you want, you could write a custom operator that deals with these details directly and and handles managing of keys. The thing we don't support right now is iterating over all keys/state for the locally held keys. I'm changing this, however, in this PR: https://github.com/apache/flink/pull/1957
Then you can do everything that you can do with the Samza KeyValueStore plus a bit more because we have more specific types of state that exploit features such as merge instead of put-update-get.

I hope this clarifies things a bit. :-)

Cheers,
Aljoscha

On Wed, 4 May 2016 at 00:28 Elias Levy <[hidden email]> wrote:
Till,

Thanks again for putting this together.  It is certainly along the lines of what I want to accomplish, but I see some problem with it.  In your code you use a ValueStore to store the priority queue.  If you are expecting to store a lot of values in the queue, then you are likely to be using RocksDB as the state backend.  But if you use a ValueStore for the priority queue with RocksDB as the backend, the whole priority queue will be deserialized and serialized each time you add an item to it.  That will become a crushing cost as the queue grows.

I could instead use a ListState with the RocksDB state, that way only the single value being added is serialized on an add.  But the get() operation in the RocksDBListState seems very inefficient, defeating the idea of working with data sets that don't fit in memory.  It loads all values into a List instead of returning an Iterable that returns values in the list by iterating via the RockDB scan API.  Samza has the advantage here, as it provides a ordered KV state API that allows you to truly iterate over the values in RocksDB and a caching lager to batch writes into RocksDB. I am surprised there is no OrderedKeyValueStore API Flink.

Given that only the RocksDB backend can store more state that can fit in memory and the cost associated with its get() method when keeping track of a list, it seems like there isn't a good solution keep track of large state in the form of a list or ordered list in Flink right now.


On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <[hidden email]> wrote:
The stream operator would do the following: Collecting the inputs from (x,y) and z which are already keyed. Thus, we know that x=z holds true. Using a priority queue we order the elements because we don't know how the arrive at the operator. Whenever we receive a watermark indicating that no earlier events can arrive anymore, we can go through the two priority queues to join the elements. The queues are part of the operators state so that we don't lose information in case of a recovery.

I've sketched such an operator here [1]. I hope this helps you to get started.