Flink Statefun Python Batch

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Statefun Python Batch

Timothy Bess
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the Python SDK rather than processing events one by one? We're trying to run our data scientist's machine learning model through the SDK, but the code is very slow when we do single events and we don't get many of the benefits of Pandas/etc.

Thanks,

Tim
Reply | Threaded
Open this post in threaded view
|

Re: Flink Statefun Python Batch

Igal Shilman
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that evaluation of the ML model is costly, and it would benefit from some sort of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.




On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <[hidden email]> wrote:
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the Python SDK rather than processing events one by one? We're trying to run our data scientist's machine learning model through the SDK, but the code is very slow when we do single events and we don't get many of the benefits of Pandas/etc.

Thanks,

Tim
Reply | Threaded
Open this post in threaded view
|

Re: Flink Statefun Python Batch

Timothy Bess
Hi Igal,

Yes! that's exactly what I was thinking. The batching will naturally happen as the model applies backpressure. We're using pandas and it's pretty costly to create a dataframe and everything to process a single event. Internally the SDK has access to the batch and is calling my function, which creates a dataframe for each individual event. This causes a ton of overhead since we basically get destroyed by the constant factors around creating and operating on dataframes.

Knowing how the SDK works, it seems like it'd be easy to do something like your example and maybe have a different decorator for "batch functions" where the SDK just passes in everything at once.

Also just out of curiosity are there plans to build out more introspection into statefun's flink state? I was thinking it would be super useful to add either Queryable state or have some control topic that statefun listens to that allows me to send events to introspect or modify flink state.

For example like:

// control topic request
{"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
// response
{"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

Or

{"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value: "base64bytes"}
{"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

Also having opentracing integration where Statefun passes b3 headers with each request so we can trace a message's route through statefun would be _super_ useful. We'd literally be able to see the entire path of an event from ingress to egress and time spent in each function. Not sure if there are any plans around that, but since we're live with a statefun project now, it's possible we could contribute some if you guys are open to it.

Thanks,

Tim

On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <[hidden email]> wrote:
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that evaluation of the ML model is costly, and it would benefit from some sort of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.




On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <[hidden email]> wrote:
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the Python SDK rather than processing events one by one? We're trying to run our data scientist's machine learning model through the SDK, but the code is very slow when we do single events and we don't get many of the benefits of Pandas/etc.

Thanks,

Tim
Reply | Threaded
Open this post in threaded view
|

Re: Flink Statefun Python Batch

Igal Shilman
Hi Tim,

Yes, I think that this feature can be implemented relatively fast.
If this blocks you at the moment, I can prepare a branch for you to experiment with, in the following days.

Regarding to open tracing integration, I think the community can benefit a lot out of this,
and definitely contributions are welcome!

[hidden email] would you like to understand more in depth, Tim's use case with opentracing?

Thanks,
Igal.



On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <[hidden email]> wrote:
Hi Igal,

Yes! that's exactly what I was thinking. The batching will naturally happen as the model applies backpressure. We're using pandas and it's pretty costly to create a dataframe and everything to process a single event. Internally the SDK has access to the batch and is calling my function, which creates a dataframe for each individual event. This causes a ton of overhead since we basically get destroyed by the constant factors around creating and operating on dataframes.

Knowing how the SDK works, it seems like it'd be easy to do something like your example and maybe have a different decorator for "batch functions" where the SDK just passes in everything at once.

Also just out of curiosity are there plans to build out more introspection into statefun's flink state? I was thinking it would be super useful to add either Queryable state or have some control topic that statefun listens to that allows me to send events to introspect or modify flink state.

For example like:

// control topic request
{"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
// response
{"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

Or

{"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value: "base64bytes"}
{"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

Also having opentracing integration where Statefun passes b3 headers with each request so we can trace a message's route through statefun would be _super_ useful. We'd literally be able to see the entire path of an event from ingress to egress and time spent in each function. Not sure if there are any plans around that, but since we're live with a statefun project now, it's possible we could contribute some if you guys are open to it.

Thanks,

Tim

On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <[hidden email]> wrote:
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that evaluation of the ML model is costly, and it would benefit from some sort of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.




On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <[hidden email]> wrote:
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the Python SDK rather than processing events one by one? We're trying to run our data scientist's machine learning model through the SDK, but the code is very slow when we do single events and we don't get many of the benefits of Pandas/etc.

Thanks,

Tim
Reply | Threaded
Open this post in threaded view
|

Re: Flink Statefun Python Batch

Konstantin Knauf-4
Hi Igal, Hi Timothy, 

this sounds very interesting. Both state introspection as well as OpenTracing support have been requested by multiple users before, so certainly something we are willing to invest into. Timothy, would you have time for a 30min call in the next days to understand your use case and requirements better? In the meantime, let's document these feature requests in Jira.  

* Exposing Batches to SDKs: https://issues.apache.org/jira/browse/FLINK-22389
* Support for OpenTracing: https://issues.apache.org/jira/browse/FLINK-22390
* Support for State Introspection: https://issues.apache.org/jira/browse/FLINK-22391

Please feel free to edit, comment on these issues directly, too. 

Cheers, 

Konstantin



Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <[hidden email]>:
Hi Tim,

Yes, I think that this feature can be implemented relatively fast.
If this blocks you at the moment, I can prepare a branch for you to experiment with, in the following days.

Regarding to open tracing integration, I think the community can benefit a lot out of this,
and definitely contributions are welcome!

[hidden email] would you like to understand more in depth, Tim's use case with opentracing?

Thanks,
Igal.



On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <[hidden email]> wrote:
Hi Igal,

Yes! that's exactly what I was thinking. The batching will naturally happen as the model applies backpressure. We're using pandas and it's pretty costly to create a dataframe and everything to process a single event. Internally the SDK has access to the batch and is calling my function, which creates a dataframe for each individual event. This causes a ton of overhead since we basically get destroyed by the constant factors around creating and operating on dataframes.

Knowing how the SDK works, it seems like it'd be easy to do something like your example and maybe have a different decorator for "batch functions" where the SDK just passes in everything at once.

Also just out of curiosity are there plans to build out more introspection into statefun's flink state? I was thinking it would be super useful to add either Queryable state or have some control topic that statefun listens to that allows me to send events to introspect or modify flink state.

For example like:

// control topic request
{"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
// response
{"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

Or

{"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value: "base64bytes"}
{"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

Also having opentracing integration where Statefun passes b3 headers with each request so we can trace a message's route through statefun would be _super_ useful. We'd literally be able to see the entire path of an event from ingress to egress and time spent in each function. Not sure if there are any plans around that, but since we're live with a statefun project now, it's possible we could contribute some if you guys are open to it.

Thanks,

Tim

On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <[hidden email]> wrote:
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that evaluation of the ML model is costly, and it would benefit from some sort of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.




On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <[hidden email]> wrote:
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the Python SDK rather than processing events one by one? We're trying to run our data scientist's machine learning model through the SDK, but the code is very slow when we do single events and we don't get many of the benefits of Pandas/etc.

Thanks,

Tim


--
Konstantin Knauf
Schneckenburgerstr. 21
81675 München
Germany
Mobil +49 174 3413182
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink Statefun Python Batch

Timothy Bess
Hi Igal and Konstantin,

Wow! I appreciate the offer of creating a branch to test with, but for now we were able to get it working by tuning a few configs and moving other blocking IO out of statefun, so no rush there. That said if you do add that, I'd definitely switch over.

That's great! I'll try to think up some suggestions to put into those tickets. Yeah I'd be up for a call on Thursday or Friday If you're free then, just let me know (my timezone is EDT).

Thanks,

Tim

On Wed, Apr 21, 2021, 4:18 AM Konstantin Knauf <[hidden email]> wrote:
Hi Igal, Hi Timothy, 

this sounds very interesting. Both state introspection as well as OpenTracing support have been requested by multiple users before, so certainly something we are willing to invest into. Timothy, would you have time for a 30min call in the next days to understand your use case and requirements better? In the meantime, let's document these feature requests in Jira.  

* Exposing Batches to SDKs: https://issues.apache.org/jira/browse/FLINK-22389
* Support for OpenTracing: https://issues.apache.org/jira/browse/FLINK-22390
* Support for State Introspection: https://issues.apache.org/jira/browse/FLINK-22391

Please feel free to edit, comment on these issues directly, too. 

Cheers, 

Konstantin



Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <[hidden email]>:
Hi Tim,

Yes, I think that this feature can be implemented relatively fast.
If this blocks you at the moment, I can prepare a branch for you to experiment with, in the following days.

Regarding to open tracing integration, I think the community can benefit a lot out of this,
and definitely contributions are welcome!

[hidden email] would you like to understand more in depth, Tim's use case with opentracing?

Thanks,
Igal.



On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <[hidden email]> wrote:
Hi Igal,

Yes! that's exactly what I was thinking. The batching will naturally happen as the model applies backpressure. We're using pandas and it's pretty costly to create a dataframe and everything to process a single event. Internally the SDK has access to the batch and is calling my function, which creates a dataframe for each individual event. This causes a ton of overhead since we basically get destroyed by the constant factors around creating and operating on dataframes.

Knowing how the SDK works, it seems like it'd be easy to do something like your example and maybe have a different decorator for "batch functions" where the SDK just passes in everything at once.

Also just out of curiosity are there plans to build out more introspection into statefun's flink state? I was thinking it would be super useful to add either Queryable state or have some control topic that statefun listens to that allows me to send events to introspect or modify flink state.

For example like:

// control topic request
{"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
// response
{"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

Or

{"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value: "base64bytes"}
{"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

Also having opentracing integration where Statefun passes b3 headers with each request so we can trace a message's route through statefun would be _super_ useful. We'd literally be able to see the entire path of an event from ingress to egress and time spent in each function. Not sure if there are any plans around that, but since we're live with a statefun project now, it's possible we could contribute some if you guys are open to it.

Thanks,

Tim

On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <[hidden email]> wrote:
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that evaluation of the ML model is costly, and it would benefit from some sort of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.




On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <[hidden email]> wrote:
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the Python SDK rather than processing events one by one? We're trying to run our data scientist's machine learning model through the SDK, but the code is very slow when we do single events and we don't get many of the benefits of Pandas/etc.

Thanks,

Tim


--
Konstantin Knauf
Schneckenburgerstr. 21
81675 München
Germany
Mobil +49 174 3413182
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink Statefun Python Batch

Igal Shilman
Hi Tim,

I've created a tiny PoC, let me know if this helps,
I can't guarantee tho, that this is how we'll eventually approach this, but it should be somewhere along these lines.


Thanks,
Igal.


On Thu, Apr 22, 2021 at 6:53 AM Timothy Bess <[hidden email]> wrote:
Hi Igal and Konstantin,

Wow! I appreciate the offer of creating a branch to test with, but for now we were able to get it working by tuning a few configs and moving other blocking IO out of statefun, so no rush there. That said if you do add that, I'd definitely switch over.

That's great! I'll try to think up some suggestions to put into those tickets. Yeah I'd be up for a call on Thursday or Friday If you're free then, just let me know (my timezone is EDT).

Thanks,

Tim

On Wed, Apr 21, 2021, 4:18 AM Konstantin Knauf <[hidden email]> wrote:
Hi Igal, Hi Timothy, 

this sounds very interesting. Both state introspection as well as OpenTracing support have been requested by multiple users before, so certainly something we are willing to invest into. Timothy, would you have time for a 30min call in the next days to understand your use case and requirements better? In the meantime, let's document these feature requests in Jira.  

* Exposing Batches to SDKs: https://issues.apache.org/jira/browse/FLINK-22389
* Support for OpenTracing: https://issues.apache.org/jira/browse/FLINK-22390
* Support for State Introspection: https://issues.apache.org/jira/browse/FLINK-22391

Please feel free to edit, comment on these issues directly, too. 

Cheers, 

Konstantin



Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <[hidden email]>:
Hi Tim,

Yes, I think that this feature can be implemented relatively fast.
If this blocks you at the moment, I can prepare a branch for you to experiment with, in the following days.

Regarding to open tracing integration, I think the community can benefit a lot out of this,
and definitely contributions are welcome!

[hidden email] would you like to understand more in depth, Tim's use case with opentracing?

Thanks,
Igal.



On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <[hidden email]> wrote:
Hi Igal,

Yes! that's exactly what I was thinking. The batching will naturally happen as the model applies backpressure. We're using pandas and it's pretty costly to create a dataframe and everything to process a single event. Internally the SDK has access to the batch and is calling my function, which creates a dataframe for each individual event. This causes a ton of overhead since we basically get destroyed by the constant factors around creating and operating on dataframes.

Knowing how the SDK works, it seems like it'd be easy to do something like your example and maybe have a different decorator for "batch functions" where the SDK just passes in everything at once.

Also just out of curiosity are there plans to build out more introspection into statefun's flink state? I was thinking it would be super useful to add either Queryable state or have some control topic that statefun listens to that allows me to send events to introspect or modify flink state.

For example like:

// control topic request
{"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
// response
{"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

Or

{"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value: "base64bytes"}
{"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

Also having opentracing integration where Statefun passes b3 headers with each request so we can trace a message's route through statefun would be _super_ useful. We'd literally be able to see the entire path of an event from ingress to egress and time spent in each function. Not sure if there are any plans around that, but since we're live with a statefun project now, it's possible we could contribute some if you guys are open to it.

Thanks,

Tim

On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <[hidden email]> wrote:
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that evaluation of the ML model is costly, and it would benefit from some sort of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.




On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <[hidden email]> wrote:
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the Python SDK rather than processing events one by one? We're trying to run our data scientist's machine learning model through the SDK, but the code is very slow when we do single events and we don't get many of the benefits of Pandas/etc.

Thanks,

Tim


--
Konstantin Knauf
Schneckenburgerstr. 21
81675 München
Germany
Mobil +49 174 3413182
[hidden email]