Remote Stateful Function Scalability

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Remote Stateful Function Scalability

Elias Levy
After reading the Stateful Functions documentation, I am left wondering how remote stateful functions scale.

The documentation mentions that the use of remote functions allows the state and compute tiers to scale independently. But the documentation seems to imply that only a single instance of a function type can execute at a time per worker ("When an application starts, each parallel worker of the framework will create one physical object per function type. This object will be used to execute all logical instances of that type that are run by that particular worker.") That would seem to tie and limit the parallelism of the compute layer to that of the storage layer even when using remote functions.

Can a worker execute multiple concurrent remote stateful functions of different types?

Can a worker execute multiple concurrent remote stateful functions of the same type with different keys?

If a worker can execute multiple concurrent remote stateful functions of the same type with different keys, does it ensure their output is ordered like its inputs?


Reply | Threaded
Open this post in threaded view
|

Re: Remote Stateful Function Scalability

Tzu-Li (Gordon) Tai
Hi Elias,

On Sun, Oct 18, 2020 at 6:16 AM Elias Levy <[hidden email]> wrote:
After reading the Stateful Functions documentation, I am left wondering how remote stateful functions scale.

The documentation mentions that the use of remote functions allows the state and compute tiers to scale independently. But the documentation seems to imply that only a single instance of a function type can execute at a time per worker ("When an application starts, each parallel worker of the framework will create one physical object per function type. This object will be used to execute all logical instances of that type that are run by that particular worker.") That would seem to tie and limit the parallelism of the compute layer to that of the storage layer even when using remote functions.

Your observation is correct only for embedded functions, not for remote functions.
For remote functions, in the StateFun workers each physical object per function type acts as an asynchronous invocation dispatcher to the type's remote function service.

Just to quickly brief what the dispatcher does:
The dispatcher only ensures sequential invocation per logical address (function type + logical instance ID / key).
Invocations for different logical addresses (different types / different keys) can happen concurrently.

If an invocation request for a logical address is in-flight, and other messages targeted for that address arrive, they are buffered in a backlog (state) until the pending request completes.
Upon completion, the backlog is flushed and all buffered messages are sent to the remote function as a single batch invocation request.
Backpressure is applied once the backlog size reaches a threshold.

All in all, in vanilla Flink-land terms, this works similarly to Flink's AsyncIO without the stream order preserved.

So, to conclude by answering your specific questions:
 

Can a worker execute multiple concurrent remote stateful functions of different types?

Yes.
 

Can a worker execute multiple concurrent remote stateful functions of the same type with different keys?

Yes.
 

If a worker can execute multiple concurrent remote stateful functions of the same type with different keys, does it ensure their output is ordered like its inputs?

No, currently StateFun handles outgoing messages (i.e. messages going to other functions / egresses) only based on the order that the concurrent invocation requests complete.
However, I believe that it should be possible to support an ordered mode here at the cost of extra latency (early completes need to be buffered, checkpoint overhead etc.).

Hope this helps clarify some things!

Cheers,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: Remote Stateful Function Scalability

Seth Wiesman
As a note, I wrote that concepts section before remote functions were implemented. I've made a note to myself to go through and update it.

Seth

On Sat, Oct 17, 2020 at 9:29 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Elias,

On Sun, Oct 18, 2020 at 6:16 AM Elias Levy <[hidden email]> wrote:
After reading the Stateful Functions documentation, I am left wondering how remote stateful functions scale.

The documentation mentions that the use of remote functions allows the state and compute tiers to scale independently. But the documentation seems to imply that only a single instance of a function type can execute at a time per worker ("When an application starts, each parallel worker of the framework will create one physical object per function type. This object will be used to execute all logical instances of that type that are run by that particular worker.") That would seem to tie and limit the parallelism of the compute layer to that of the storage layer even when using remote functions.

Your observation is correct only for embedded functions, not for remote functions.
For remote functions, in the StateFun workers each physical object per function type acts as an asynchronous invocation dispatcher to the type's remote function service.

Just to quickly brief what the dispatcher does:
The dispatcher only ensures sequential invocation per logical address (function type + logical instance ID / key).
Invocations for different logical addresses (different types / different keys) can happen concurrently.

If an invocation request for a logical address is in-flight, and other messages targeted for that address arrive, they are buffered in a backlog (state) until the pending request completes.
Upon completion, the backlog is flushed and all buffered messages are sent to the remote function as a single batch invocation request.
Backpressure is applied once the backlog size reaches a threshold.

All in all, in vanilla Flink-land terms, this works similarly to Flink's AsyncIO without the stream order preserved.

So, to conclude by answering your specific questions:
 

Can a worker execute multiple concurrent remote stateful functions of different types?

Yes.
 

Can a worker execute multiple concurrent remote stateful functions of the same type with different keys?

Yes.
 

If a worker can execute multiple concurrent remote stateful functions of the same type with different keys, does it ensure their output is ordered like its inputs?

No, currently StateFun handles outgoing messages (i.e. messages going to other functions / egresses) only based on the order that the concurrent invocation requests complete.
However, I believe that it should be possible to support an ordered mode here at the cost of extra latency (early completes need to be buffered, checkpoint overhead etc.).

Hope this helps clarify some things!

Cheers,
Gordon