Re: Apache Flink and serious streaming stateful processing

Posted by Krzysztof Zarzycki on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Apache-Flink-and-serious-streaming-stateful-processing-tp1865p3112.html

Hi guys! 
I'm sorry I have abandoned this thread but I had to give up Flink for some time. Now I'm back and would like to resurrect this thread. Flink has rapidly evolved in this time too, so maybe new features will allow me what I want to do. By the way, I heard really only good stuff about you from Flink Forward conference!

First, about back-pressure. As you said, it is working well so I'm taking it as granted. Sounds great! 

Let's focus now on stateful processing: 

To back up what I mean, I'm citing some numbers of the state I'm currently holding: 
My stream processing program keeps around 300GB in 1 month state, but it will be holding around 2 months, so twice as much (600 GB). The state is key-value store, where key is some user id & value is actually a list of events correlated with the user. There are tens of millions of keys - unique user ids. The stream is partitioned on user id, so my state can be partitioned on user id as well. 
Currently I keep this "state" in Cassandra, so externally to the program, but this is my biggest pain as the communication cost is large, especially when I do reprocessing of my streaming data. 

Now what I would like to have is some abstraction available in Flink, that allows me to keep the state out-of-core, but embedded. I would use it as key-value store and Flink will journal & replicate all the update operations, so they are recoverable on failure, when the state (or its partition) is lost. 
To describe my idea in code, I imagine the following pseudocode (totally abstracted from Flink):
class MyProcessor { 
  val keyValueState = Flink.createKeyValueState("name-it")
  
  def processRecord(r: Record) { 
     val userList = keyValueState.get(r.get("userId"))
     userList += r.get("someData")
     keyValueState.put(r.get("userId"), userList)
  }
}

Something similar is in Samza, with grants:
- all puts are replicated (by saving the put in separate Kafka topic). 
- on failure & recover, the state is recovered from the saved puts, before starting the processing. 


Last time, you said that you're "working on making incrementally backed-up key/value state a first-class citizen in Flink, but is is still WIP".  How this change since then? Do you already support the case that I just described? 


Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB out-of-core performance , and I don't know yet if it can match performance of RocksDB-like database, but I will try to find time to check it.
In meantime, this is the performance that attracts me to RocksDb:
Measure performance to load 1B keys into the database. The keys are inserted in random order. 
 rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion key-values)
Measure performance to load 1B keys into the database. The keys are inserted in sequential order. 
rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)


Cheers!
Krzysiek

2015-06-30 15:00 GMT+02:00 Ufuk Celebi <[hidden email]>:

On 30 Jun 2015, at 14:23, Gyula Fóra <[hidden email]> wrote:
> 2. We have support for stateful processing in Flink in many ways you have described in your question. Unfortunately the docs are down currently but you should check out the 'Stateful processing' section in the 0.10 docs (once its back online). We practically support an OperatorState interface which let's you keep partitioned state by some key and access it from runtime operators. The states declared using these interfaces are checkpointed and will be restored on failure. Currently all the states are stored in-memory but we are planning to extend it to allow writing state updates to external systems.

http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation