Apache Flink and serious streaming stateful processing

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink and serious streaming stateful processing

Krzysztof Zarzycki
Greetings!
I'm extremely interested in Apache Flink, I think you're doing really a great job! But please allow me to share two things that I would require from Apache Flink to consider it as groundbreaking (it is what I need for Streaming framework): 

1. Stream backpressure. When stream processing part does not keep up, please pause receiving new data. This is a serious problem in other frameworks, like Spark Streaming. Please see the ticket in Spark about it: https://issues.apache.org/jira/browse/SPARK-7398
2. Support for (serious) stateful processing. What I mean by that is to be able to keep state of the application in key-value stores, out-of-core, in embedded mode. I want to be able to keep, let's say history of events from last two months, grouped & accessible by user_id, and don't want to use external database for that (e.g. Cassandra). Communicating with external database would kill my performance especially when *reprocessing* historical data. And I definitely don't want to escape to batch processing (like in Lambda Architecture). 

These two are the most important (IMHO) lacks in Spark Streaming and are the reasons I'm not using it. These two are supported by Samza, which in code and API is not excellent, but at least allows serious stream processing, that does not require repeating the processing pipeline in batch (Hadoop). 

I'm looking forward to seeing features like these in Flink. Or they are already there and I'm just missing something?

Thanks!
Krzysztof Zarzycki
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and serious streaming stateful processing

Gyula Fóra
Hi Krzysztof,

Thank you for your questions, we are happy to help you getting started.

Regarding your questions:

1. There is backpressure for the streams, so if the downstream operators cannot keep up the sources will slow down.

2. We have support for stateful processing in Flink in many ways you have described in your question. Unfortunately the docs are down currently but you should check out the 'Stateful processing' section in the 0.10 docs (once its back online). We practically support an OperatorState interface which let's you keep partitioned state by some key and access it from runtime operators. The states declared using these interfaces are checkpointed and will be restored on failure. Currently all the states are stored in-memory but we are planning to extend it to allow writing state updates to external systems.

I will send you some pointers once the docs are up again.

Cheers,
Gyula

Krzysztof Zarzycki <[hidden email]> ezt írta (időpont: 2015. jún. 30., K, 14:07):
Greetings!
I'm extremely interested in Apache Flink, I think you're doing really a great job! But please allow me to share two things that I would require from Apache Flink to consider it as groundbreaking (it is what I need for Streaming framework): 

1. Stream backpressure. When stream processing part does not keep up, please pause receiving new data. This is a serious problem in other frameworks, like Spark Streaming. Please see the ticket in Spark about it: https://issues.apache.org/jira/browse/SPARK-7398
2. Support for (serious) stateful processing. What I mean by that is to be able to keep state of the application in key-value stores, out-of-core, in embedded mode. I want to be able to keep, let's say history of events from last two months, grouped & accessible by user_id, and don't want to use external database for that (e.g. Cassandra). Communicating with external database would kill my performance especially when *reprocessing* historical data. And I definitely don't want to escape to batch processing (like in Lambda Architecture). 

These two are the most important (IMHO) lacks in Spark Streaming and are the reasons I'm not using it. These two are supported by Samza, which in code and API is not excellent, but at least allows serious stream processing, that does not require repeating the processing pipeline in batch (Hadoop). 

I'm looking forward to seeing features like these in Flink. Or they are already there and I'm just missing something?

Thanks!
Krzysztof Zarzycki
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and serious streaming stateful processing

Stephan Ewen
Hi Krzysztof,

Thanks for the kind words! I think that Flink is to a good extend set up and provide what you are looking for. The remaining gaps are WIP.

Let me elaborate a bit on Gyula's answers:

1)
Backpressure is very much there, it has always been working well, also better than in Storm, as far as I can tell.

The way Flink builds streams is by shipping buffers through logical channels, which are multiplexed through back-pressured network channels. The buffers (on both sender and receiver side) come from managed bounded buffer pools. As soon as receivers slot down, some bounded amount of data will queue up in the buffer pool on both sender and receiver side, but then the producing operator will block until space in the buffer pool is available.

The back pressure goes back all the way to the sources, and eventually the source will stop grabbing more data, and will leave it (for example in Kafka).


2)
This part is currently under evolution, API wise. It would be good to get your input to make sure we validate the design with real-world use cases. Let me make sure we get correctly what you want to do.
You want to do stateful computation and use key/value state abstraction, but the state should not go into an external key value store. It should be maintained in Flink, but in a out-of-core enabled fashion.

You can do much (but not all) of that right now. You can keep a hash map in your application and make state changes on it. The hash map can be backed up by the fault tolerance system.
This will, however, only work up to a certain size. The benefit is (compared to Samza) that state restoring is quite fast.

We are working on making incrementally backed-up key/value state a first-class citizen in Flink, but is is still WIP.

For now, concerning out-of-core state, you can experiment with embedding an out-of-core key/value database in your operators, something like http://www.mapdb.org
Because operators are long lived (unlike in mini batches), this db will keep existing as well. You can even write a method that lets Flink back this up periodically into HDFS. It should work as long as the checkpoint interval is not too high.


Let us know how far that gets you. We will also keep you posted with advances in the state abstraction.

Greetings,
Stephan



On Tue, Jun 30, 2015 at 2:23 PM, Gyula Fóra <[hidden email]> wrote:
Hi Krzysztof,

Thank you for your questions, we are happy to help you getting started.

Regarding your questions:

1. There is backpressure for the streams, so if the downstream operators cannot keep up the sources will slow down.

2. We have support for stateful processing in Flink in many ways you have described in your question. Unfortunately the docs are down currently but you should check out the 'Stateful processing' section in the 0.10 docs (once its back online). We practically support an OperatorState interface which let's you keep partitioned state by some key and access it from runtime operators. The states declared using these interfaces are checkpointed and will be restored on failure. Currently all the states are stored in-memory but we are planning to extend it to allow writing state updates to external systems.

I will send you some pointers once the docs are up again.

Cheers,
Gyula

Krzysztof Zarzycki <[hidden email]> ezt írta (időpont: 2015. jún. 30., K, 14:07):
Greetings!
I'm extremely interested in Apache Flink, I think you're doing really a great job! But please allow me to share two things that I would require from Apache Flink to consider it as groundbreaking (it is what I need for Streaming framework): 

1. Stream backpressure. When stream processing part does not keep up, please pause receiving new data. This is a serious problem in other frameworks, like Spark Streaming. Please see the ticket in Spark about it: https://issues.apache.org/jira/browse/SPARK-7398
2. Support for (serious) stateful processing. What I mean by that is to be able to keep state of the application in key-value stores, out-of-core, in embedded mode. I want to be able to keep, let's say history of events from last two months, grouped & accessible by user_id, and don't want to use external database for that (e.g. Cassandra). Communicating with external database would kill my performance especially when *reprocessing* historical data. And I definitely don't want to escape to batch processing (like in Lambda Architecture). 

These two are the most important (IMHO) lacks in Spark Streaming and are the reasons I'm not using it. These two are supported by Samza, which in code and API is not excellent, but at least allows serious stream processing, that does not require repeating the processing pipeline in batch (Hadoop). 

I'm looking forward to seeing features like these in Flink. Or they are already there and I'm just missing something?

Thanks!
Krzysztof Zarzycki

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and serious streaming stateful processing

Ufuk Celebi
In reply to this post by Gyula Fóra

On 30 Jun 2015, at 14:23, Gyula Fóra <[hidden email]> wrote:
> 2. We have support for stateful processing in Flink in many ways you have described in your question. Unfortunately the docs are down currently but you should check out the 'Stateful processing' section in the 0.10 docs (once its back online). We practically support an OperatorState interface which let's you keep partitioned state by some key and access it from runtime operators. The states declared using these interfaces are checkpointed and will be restored on failure. Currently all the states are stored in-memory but we are planning to extend it to allow writing state updates to external systems.

http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and serious streaming stateful processing

Krzysztof Zarzycki
Hi guys! 
I'm sorry I have abandoned this thread but I had to give up Flink for some time. Now I'm back and would like to resurrect this thread. Flink has rapidly evolved in this time too, so maybe new features will allow me what I want to do. By the way, I heard really only good stuff about you from Flink Forward conference!

First, about back-pressure. As you said, it is working well so I'm taking it as granted. Sounds great! 

Let's focus now on stateful processing: 

To back up what I mean, I'm citing some numbers of the state I'm currently holding: 
My stream processing program keeps around 300GB in 1 month state, but it will be holding around 2 months, so twice as much (600 GB). The state is key-value store, where key is some user id & value is actually a list of events correlated with the user. There are tens of millions of keys - unique user ids. The stream is partitioned on user id, so my state can be partitioned on user id as well. 
Currently I keep this "state" in Cassandra, so externally to the program, but this is my biggest pain as the communication cost is large, especially when I do reprocessing of my streaming data. 

Now what I would like to have is some abstraction available in Flink, that allows me to keep the state out-of-core, but embedded. I would use it as key-value store and Flink will journal & replicate all the update operations, so they are recoverable on failure, when the state (or its partition) is lost. 
To describe my idea in code, I imagine the following pseudocode (totally abstracted from Flink):
class MyProcessor { 
  val keyValueState = Flink.createKeyValueState("name-it")
  
  def processRecord(r: Record) { 
     val userList = keyValueState.get(r.get("userId"))
     userList += r.get("someData")
     keyValueState.put(r.get("userId"), userList)
  }
}

Something similar is in Samza, with grants:
- all puts are replicated (by saving the put in separate Kafka topic). 
- on failure & recover, the state is recovered from the saved puts, before starting the processing. 


Last time, you said that you're "working on making incrementally backed-up key/value state a first-class citizen in Flink, but is is still WIP".  How this change since then? Do you already support the case that I just described? 


Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB out-of-core performance , and I don't know yet if it can match performance of RocksDB-like database, but I will try to find time to check it.
In meantime, this is the performance that attracts me to RocksDb:
Measure performance to load 1B keys into the database. The keys are inserted in random order. 
 rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion key-values)
Measure performance to load 1B keys into the database. The keys are inserted in sequential order. 
rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)


Cheers!
Krzysiek

2015-06-30 15:00 GMT+02:00 Ufuk Celebi <[hidden email]>:

On 30 Jun 2015, at 14:23, Gyula Fóra <[hidden email]> wrote:
> 2. We have support for stateful processing in Flink in many ways you have described in your question. Unfortunately the docs are down currently but you should check out the 'Stateful processing' section in the 0.10 docs (once its back online). We practically support an OperatorState interface which let's you keep partitioned state by some key and access it from runtime operators. The states declared using these interfaces are checkpointed and will be restored on failure. Currently all the states are stored in-memory but we are planning to extend it to allow writing state updates to external systems.

http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and serious streaming stateful processing

Gyula Fóra
Hi!

You have an interesting use case that I think comes up in many applications (in fact I will be working on something very similar shortly).

Stephan has made some nice changes (this PR) to the State interfaces supporting flexible backends, which can be used to implement this functionality. While the new state backends support more efficient checkpoints for KV states it still does not support incremental snapshots and keeping out-of-core states.

My current plan is to implement a caching layer on top of the storage layer (Cassandra in your case) which will be used to only keep the "hot" keys in the Flink streaming operators and would evict the cold keys to the external storage on checkpoints.

Some things we need to worry about this case is that we should only overwrite elements in our storage when we know that a checkpoint is complete. (we can use checkpointnotifications for this) This assumes some sort of versioning in the storage layer.

I think this will definitely not make it to 0.10, but I am confident that we will have something working for the next (1.0) release. I suggest you try experimenting with the logic, maybe you can get something working much quicker :)

Cheers,
Gyula

Krzysztof Zarzycki <[hidden email]> ezt írta (időpont: 2015. okt. 14., Sze, 11:02):
Hi guys! 
I'm sorry I have abandoned this thread but I had to give up Flink for some time. Now I'm back and would like to resurrect this thread. Flink has rapidly evolved in this time too, so maybe new features will allow me what I want to do. By the way, I heard really only good stuff about you from Flink Forward conference!

First, about back-pressure. As you said, it is working well so I'm taking it as granted. Sounds great! 

Let's focus now on stateful processing: 

To back up what I mean, I'm citing some numbers of the state I'm currently holding: 
My stream processing program keeps around 300GB in 1 month state, but it will be holding around 2 months, so twice as much (600 GB). The state is key-value store, where key is some user id & value is actually a list of events correlated with the user. There are tens of millions of keys - unique user ids. The stream is partitioned on user id, so my state can be partitioned on user id as well. 
Currently I keep this "state" in Cassandra, so externally to the program, but this is my biggest pain as the communication cost is large, especially when I do reprocessing of my streaming data. 

Now what I would like to have is some abstraction available in Flink, that allows me to keep the state out-of-core, but embedded. I would use it as key-value store and Flink will journal & replicate all the update operations, so they are recoverable on failure, when the state (or its partition) is lost. 
To describe my idea in code, I imagine the following pseudocode (totally abstracted from Flink):
class MyProcessor { 
  val keyValueState = Flink.createKeyValueState("name-it")
  
  def processRecord(r: Record) { 
     val userList = keyValueState.get(r.get("userId"))
     userList += r.get("someData")
     keyValueState.put(r.get("userId"), userList)
  }
}

Something similar is in Samza, with grants:
- all puts are replicated (by saving the put in separate Kafka topic). 
- on failure & recover, the state is recovered from the saved puts, before starting the processing. 


Last time, you said that you're "working on making incrementally backed-up key/value state a first-class citizen in Flink, but is is still WIP".  How this change since then? Do you already support the case that I just described? 


Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB out-of-core performance , and I don't know yet if it can match performance of RocksDB-like database, but I will try to find time to check it.
In meantime, this is the performance that attracts me to RocksDb:
Measure performance to load 1B keys into the database. The keys are inserted in random order. 
 rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion key-values)
Measure performance to load 1B keys into the database. The keys are inserted in sequential order. 
rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)


Cheers!
Krzysiek

2015-06-30 15:00 GMT+02:00 Ufuk Celebi <[hidden email]>:

On 30 Jun 2015, at 14:23, Gyula Fóra <[hidden email]> wrote:
> 2. We have support for stateful processing in Flink in many ways you have described in your question. Unfortunately the docs are down currently but you should check out the 'Stateful processing' section in the 0.10 docs (once its back online). We practically support an OperatorState interface which let's you keep partitioned state by some key and access it from runtime operators. The states declared using these interfaces are checkpointed and will be restored on failure. Currently all the states are stored in-memory but we are planning to extend it to allow writing state updates to external systems.

http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink and serious streaming stateful processing

Stephan Ewen
Hi!

As Gyula mentioned an upcoming Pull Request will make the state backend pluggable. We would like to add the following state holders into Flink:

(1) Small state in memory (local execution / debugging) : State maintained in a heap hash map, checkpoints to JobManager. This is in there now.

(2) Large state in memory - State maintained as a heap hash map, checkpoints to distributed file system. This is in there now as well.

(3) Large state in-memory/out of core, (incremental) checkpoints to file system.

(4) State maintained externally, hot keys cached in Flink. This is what Gyula is working on right now.


I think point (3) is what you are looking for.

There are actually two different variants we could attempt that (3.1 and 3.2), where (3.1) would have the state in an out-of-core index in Flink and (3.2) would use an embedded LevelDB or so to store the key/value pairs.


What we have by now is the pluggable back end. I would like to start with (3) after the 0.10 release is out, which should be very soon, we have begun the fixing and testing phase.
How fast a prototype of this state backend is available depends a bit on how well MapDB, LevelDB or any of those can be embedded for our case. I hope it is pretty soon :-)

Greetings,
Stephan


On Thu, Oct 15, 2015 at 12:55 PM, Gyula Fóra <[hidden email]> wrote:
Hi!

You have an interesting use case that I think comes up in many applications (in fact I will be working on something very similar shortly).

Stephan has made some nice changes (this PR) to the State interfaces supporting flexible backends, which can be used to implement this functionality. While the new state backends support more efficient checkpoints for KV states it still does not support incremental snapshots and keeping out-of-core states.

My current plan is to implement a caching layer on top of the storage layer (Cassandra in your case) which will be used to only keep the "hot" keys in the Flink streaming operators and would evict the cold keys to the external storage on checkpoints.

Some things we need to worry about this case is that we should only overwrite elements in our storage when we know that a checkpoint is complete. (we can use checkpointnotifications for this) This assumes some sort of versioning in the storage layer.

I think this will definitely not make it to 0.10, but I am confident that we will have something working for the next (1.0) release. I suggest you try experimenting with the logic, maybe you can get something working much quicker :)

Cheers,
Gyula

Krzysztof Zarzycki <[hidden email]> ezt írta (időpont: 2015. okt. 14., Sze, 11:02):
Hi guys! 
I'm sorry I have abandoned this thread but I had to give up Flink for some time. Now I'm back and would like to resurrect this thread. Flink has rapidly evolved in this time too, so maybe new features will allow me what I want to do. By the way, I heard really only good stuff about you from Flink Forward conference!

First, about back-pressure. As you said, it is working well so I'm taking it as granted. Sounds great! 

Let's focus now on stateful processing: 

To back up what I mean, I'm citing some numbers of the state I'm currently holding: 
My stream processing program keeps around 300GB in 1 month state, but it will be holding around 2 months, so twice as much (600 GB). The state is key-value store, where key is some user id & value is actually a list of events correlated with the user. There are tens of millions of keys - unique user ids. The stream is partitioned on user id, so my state can be partitioned on user id as well. 
Currently I keep this "state" in Cassandra, so externally to the program, but this is my biggest pain as the communication cost is large, especially when I do reprocessing of my streaming data. 

Now what I would like to have is some abstraction available in Flink, that allows me to keep the state out-of-core, but embedded. I would use it as key-value store and Flink will journal & replicate all the update operations, so they are recoverable on failure, when the state (or its partition) is lost. 
To describe my idea in code, I imagine the following pseudocode (totally abstracted from Flink):
class MyProcessor { 
  val keyValueState = Flink.createKeyValueState("name-it")
  
  def processRecord(r: Record) { 
     val userList = keyValueState.get(r.get("userId"))
     userList += r.get("someData")
     keyValueState.put(r.get("userId"), userList)
  }
}

Something similar is in Samza, with grants:
- all puts are replicated (by saving the put in separate Kafka topic). 
- on failure & recover, the state is recovered from the saved puts, before starting the processing. 


Last time, you said that you're "working on making incrementally backed-up key/value state a first-class citizen in Flink, but is is still WIP".  How this change since then? Do you already support the case that I just described? 


Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB out-of-core performance , and I don't know yet if it can match performance of RocksDB-like database, but I will try to find time to check it.
In meantime, this is the performance that attracts me to RocksDb:
Measure performance to load 1B keys into the database. The keys are inserted in random order. 
 rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion key-values)
Measure performance to load 1B keys into the database. The keys are inserted in sequential order. 
rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)


Cheers!
Krzysiek

2015-06-30 15:00 GMT+02:00 Ufuk Celebi <[hidden email]>:

On 30 Jun 2015, at 14:23, Gyula Fóra <[hidden email]> wrote:
> 2. We have support for stateful processing in Flink in many ways you have described in your question. Unfortunately the docs are down currently but you should check out the 'Stateful processing' section in the 0.10 docs (once its back online). We practically support an OperatorState interface which let's you keep partitioned state by some key and access it from runtime operators. The states declared using these interfaces are checkpointed and will be restored on failure. Currently all the states are stored in-memory but we are planning to extend it to allow writing state updates to external systems.

http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation