Hi, In doing some testing with Flink stateful functions in Python and I’ve gotten a small POC working. One of our key requirements for our stream processors is that they be written in python due to the skillset of our team.
Given that the Python DataStreams api seems to be under development in Flink 1.12, we’ve implemented our business logic as a stateful function using the remote pattern. In some testing, it seems the state object is getting serialized and sent along with each
HTTP request and given that we’re storing quite a bit of data in this state, this seems to contribute to the latency of the application in a linear fashion. Is there any way around this? Is there a way to store the state local to the python application? Thanks, Dan
|
Hi, Nice to hear that you are trying out StateFun! It is by design that function state is attached to each HTTP invocation request, as defined by StateFun's remote invocation request-reply protocol. This decision was made with typical application cloud-native architectures in mind - having function deployments be stateless and require no session dependencies between the StateFun runtime and the functions services allows the functions to scale out very easily. There are some discussions on potentially adding a bi-directional protocol in the future so that state can be lazily fetched on demand instead of every invocation, but that is still in very early stages of discussion. Could you briefly describe what the state access pattern in your application looks like? Maybe this can provide some insight for us in figuring out how a more advanced / efficient protocol should be designed in future releases. On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C <[hidden email]> wrote:
One clarification here: StateFun does not serialize or deserialize state, everything is maintained and provided to functions as byte arrays. Serialization / deserialization happens in user code (i.e. the functions). Cheers, Gordon
|
Hi, This makes sense and I can appreciate the stateless aspect for the remote functions. We have a number of components that need access to quite a bit of data, the idea was to key the incoming stream
in a way that would minimize calls to a reference DB and then store that result set in the state so it would be readily available for subsequent messages with the same key. Additionally, I had hoped to use delayed messages as a way of invalidating cache after
a certain amount of time. Please tell me if this is an antipattern as this project is really my first foray into stream processing. For us, python is a hard requirement so I was hoping that the state would be similar to the other Flink jobs where its local to the processor, however given the remote stateful architecture, it completely
makes sense why it isn’t. On a separate topic, is anyone using StateFun in production? Thanks, Dan From: "Tzu-Li (Gordon) Tai" <[hidden email]> Hi, Nice to hear that you are trying out StateFun! It is by design that function state is attached to each HTTP invocation request, as defined by StateFun's remote invocation request-reply protocol. This decision was made with typical application cloud-native architectures in mind - having function deployments be stateless and require no session dependencies between the StateFun runtime and the functions services allows the functions
to scale out very easily. There are some discussions on potentially adding a bi-directional protocol in the future so that state can be lazily fetched on demand instead of every invocation, but that is still in very early stages of discussion. Could you briefly describe what the state access pattern in your application looks like? Maybe this can provide some insight for us in figuring out how a more advanced / efficient protocol should be designed in future releases. On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C <[hidden email]> wrote:
One clarification here: StateFun does not serialize or deserialize state, everything is maintained and provided to functions as byte arrays. Serialization / deserialization happens in user code (i.e. the functions). Cheers, Gordon
|
Hi, On Fri, Oct 9, 2020, 4:20 PM Clements, Danial C <[hidden email]> wrote:
This is definitely not an antipattern! Co-sharding state and message streams so that compute may benefit from local state access instead of requiring remote queries is one of the key principles of distributed stateful stream processing. Your idea of invalidating old state is also very sane. You can actually just set a state TTL for that [1].
Using non-JVM languages, state access must always somehow be transported out from the Flink JVM processes (where the state is maintained) to the functions, whether it's over a local or remote network. This is the same for all Python libraries on top of Flink, such as Flink's Python Table API, or Apache Beam's Python API. Both of these require transporting state over a local network. If you'd like to use StateFun because of the programming constructs and dynamic messaging flexibility it provides, you actually have many different function deployment options. For example, the remote deployment approach I explained in my previous email, in which functions are deployed as services separate to the Flink StateFun cluster and can benefit from rapid scalability and zero downtime upgrades / live reloads. Alternatively, if you prefer performance over operational flexibility, you can consider the sidecar / co-location deployment approach [2]. Cheers, Gordon
|
Hi Gordan, Glad to hear this is all inline with the patterns for StateFun! I think one thing that still trips me up in understanding the relationship between Flink and StateFun is how masters and workers come
into play. In the case of remote functions, what are the scaling requirements for the Flink workers? How do you know when you’ll need to add more instances? Thanks, Dan From: "Tzu-Li (Gordon) Tai" <[hidden email]> Hi, On Fri, Oct 9, 2020, 4:20 PM Clements, Danial C <[hidden email]> wrote:
This is definitely not an antipattern! Co-sharding state and message streams so that compute may benefit from local state access instead of requiring remote queries is one of the key principles of distributed stateful stream processing. Your idea of invalidating old state is also very sane. You can actually just set a state TTL for that [1].
Using non-JVM languages, state access must always somehow be transported out from the Flink JVM processes (where the state is maintained) to the functions, whether it's over a local or remote network. This is the same for all Python libraries on top of Flink, such as Flink's Python Table API, or Apache Beam's Python API. Both of these require transporting state over a local network. If you'd like to use StateFun because of the programming constructs and dynamic messaging flexibility it provides, you actually have many different function deployment options. For example, the remote deployment approach I explained in my previous email, in which functions are deployed as services separate to the Flink StateFun cluster and can benefit from rapid scalability and zero downtime upgrades / live reloads. Alternatively, if you prefer performance over operational flexibility, you can consider the sidecar / co-location deployment approach [2]. Cheers, Gordon
|
Free forum by Nabble | Edit this page |