Stateful Functions + ML model prediction

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Functions + ML model prediction

John Morrow
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The records are sourced from a message queue, enriched as they flow through the pipeline based on business rules and finally written to a database. We're using the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a certain word then set field Y to a certain value. For the implementation I began by looking at https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for inspiration. I ended up implementing a business rule as a Java class with a match-predicate & an action. The records enter the pipeline on a data stream which is joined with the rules in a broadcast stream and a ProcessFunction checks each record to see if it matches any rule predicates. If the record doesn't match any business rule predicates it continues on in the pipeline. If the record does match one or more business rule predicates it is sent to a side output with the list of business rules that it matched. The side output data stream goes through a RichAsyncFunction which loops through the matched rules and applies each one's action to the record. At the end, that enriched side-output record stream is unioned back with the non-enriched record stream. This all worked fine.

I have some new business rules which are more complicated and require sending the record's text field to different pre-trained NLP models for prediction, e.g. if a model predicts the text language is X then update field Y to that value, if another model predicts the sentiment is positive then set some other field to another value. I'm planning on using seldon-core to serve these pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in Flink. I could add in a new ProcessFunction in my pipeline before my existing enrichment-rule-predicate ProcessFunction and have it send the text to each of the prediction models and add the results for each one to the record so it's available for the enrichment step. The downside of this is that in the future I'm anticipating having more and more models, and not necessarily wanting to send each record to every model for prediction. e.g. I might have a business rule which says if the author of the text is X then get the sentiment (via the sentiment model) and update field Z, so it would be a waste of time doing that for all records.

I had a look at stateful functions. There's an example in the statefun.io overview which shows having a stateful function for doing a fraud model prediction based on if an account has had X number of frauds detected in the last 30 days, so the key for the state is an account number. In my case, these model predictions don't really have any state - they just take input and return a prediction, they're more like a stateless lambda function. Also, I was wondering if I implemented these as stateful functions would I be able to make them available to other Flink jobs within the cluster, as opposed to having them as individual RichAsyncFunctions defined within a single Flink job and only available to that. The last thing which made stateful functions sound good was that at the moment all my business rules happen to be orthogonal, but I can imagine in the future where I might want one rule to be based on another one, and whereas regular dataflows have to be an acyclic graph stateful functions could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions + ML model prediction

Igal Shilman
Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the necessary context (any previous state for a key, and the message) to the HTTP request.
So practically speaking the same remote function can be contacted by different Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules and the enrichment to the remote function. 
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new version of the remote function container, as they can be independy restarted (without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an RichAsyncFunction, as StateFun, by default, invokes many remote functions in parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The records are sourced from a message queue, enriched as they flow through the pipeline based on business rules and finally written to a database. We're using the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a certain word then set field Y to a certain value. For the implementation I began by looking at https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for inspiration. I ended up implementing a business rule as a Java class with a match-predicate & an action. The records enter the pipeline on a data stream which is joined with the rules in a broadcast stream and a ProcessFunction checks each record to see if it matches any rule predicates. If the record doesn't match any business rule predicates it continues on in the pipeline. If the record does match one or more business rule predicates it is sent to a side output with the list of business rules that it matched. The side output data stream goes through a RichAsyncFunction which loops through the matched rules and applies each one's action to the record. At the end, that enriched side-output record stream is unioned back with the non-enriched record stream. This all worked fine.

I have some new business rules which are more complicated and require sending the record's text field to different pre-trained NLP models for prediction, e.g. if a model predicts the text language is X then update field Y to that value, if another model predicts the sentiment is positive then set some other field to another value. I'm planning on using seldon-core to serve these pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in Flink. I could add in a new ProcessFunction in my pipeline before my existing enrichment-rule-predicate ProcessFunction and have it send the text to each of the prediction models and add the results for each one to the record so it's available for the enrichment step. The downside of this is that in the future I'm anticipating having more and more models, and not necessarily wanting to send each record to every model for prediction. e.g. I might have a business rule which says if the author of the text is X then get the sentiment (via the sentiment model) and update field Z, so it would be a waste of time doing that for all records.

I had a look at stateful functions. There's an example in the statefun.io overview which shows having a stateful function for doing a fraud model prediction based on if an account has had X number of frauds detected in the last 30 days, so the key for the state is an account number. In my case, these model predictions don't really have any state - they just take input and return a prediction, they're more like a stateless lambda function. Also, I was wondering if I implemented these as stateful functions would I be able to make them available to other Flink jobs within the cluster, as opposed to having them as individual RichAsyncFunctions defined within a single Flink job and only available to that. The last thing which made stateful functions sound good was that at the moment all my business rules happen to be orthogonal, but I can imagine in the future where I might want one rule to be based on another one, and whereas regular dataflows have to be an acyclic graph stateful functions could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions + ML model prediction

John Morrow
Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds like a good direction.

Thanks again,
John.


From: Igal Shilman <[hidden email]>
Sent: Wednesday 23 September 2020 09:06
To: John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the necessary context (any previous state for a key, and the message) to the HTTP request.
So practically speaking the same remote function can be contacted by different Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules and the enrichment to the remote function. 
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new version of the remote function container, as they can be independy restarted (without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an RichAsyncFunction, as StateFun, by default, invokes many remote functions in parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The records are sourced from a message queue, enriched as they flow through the pipeline based on business rules and finally written to a database. We're using the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a certain word then set field Y to a certain value. For the implementation I began by looking at https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for inspiration. I ended up implementing a business rule as a Java class with a match-predicate & an action. The records enter the pipeline on a data stream which is joined with the rules in a broadcast stream and a ProcessFunction checks each record to see if it matches any rule predicates. If the record doesn't match any business rule predicates it continues on in the pipeline. If the record does match one or more business rule predicates it is sent to a side output with the list of business rules that it matched. The side output data stream goes through a RichAsyncFunction which loops through the matched rules and applies each one's action to the record. At the end, that enriched side-output record stream is unioned back with the non-enriched record stream. This all worked fine.

I have some new business rules which are more complicated and require sending the record's text field to different pre-trained NLP models for prediction, e.g. if a model predicts the text language is X then update field Y to that value, if another model predicts the sentiment is positive then set some other field to another value. I'm planning on using seldon-core to serve these pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in Flink. I could add in a new ProcessFunction in my pipeline before my existing enrichment-rule-predicate ProcessFunction and have it send the text to each of the prediction models and add the results for each one to the record so it's available for the enrichment step. The downside of this is that in the future I'm anticipating having more and more models, and not necessarily wanting to send each record to every model for prediction. e.g. I might have a business rule which says if the author of the text is X then get the sentiment (via the sentiment model) and update field Z, so it would be a waste of time doing that for all records.

I had a look at stateful functions. There's an example in the statefun.io overview which shows having a stateful function for doing a fraud model prediction based on if an account has had X number of frauds detected in the last 30 days, so the key for the state is an account number. In my case, these model predictions don't really have any state - they just take input and return a prediction, they're more like a stateless lambda function. Also, I was wondering if I implemented these as stateful functions would I be able to make them available to other Flink jobs within the cluster, as opposed to having them as individual RichAsyncFunctions defined within a single Flink job and only available to that. The last thing which made stateful functions sound good was that at the moment all my business rules happen to be orthogonal, but I can imagine in the future where I might want one rule to be based on another one, and whereas regular dataflows have to be an acyclic graph stateful functions could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions + ML model prediction

John Morrow
Hi Flink Users,

I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward (https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka & Kinesis are supported, and looking at https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO packages for those two: statefun-kafka-io & statefun-kinesis-io


Is it possible to use Apache Pulsar as a Statefun ingress & egress?

Thanks,
John.


From: John Morrow <[hidden email]>
Sent: Wednesday 23 September 2020 11:37
To: Igal Shilman <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds like a good direction.

Thanks again,
John.


From: Igal Shilman <[hidden email]>
Sent: Wednesday 23 September 2020 09:06
To: John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the necessary context (any previous state for a key, and the message) to the HTTP request.
So practically speaking the same remote function can be contacted by different Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules and the enrichment to the remote function. 
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new version of the remote function container, as they can be independy restarted (without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an RichAsyncFunction, as StateFun, by default, invokes many remote functions in parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The records are sourced from a message queue, enriched as they flow through the pipeline based on business rules and finally written to a database. We're using the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a certain word then set field Y to a certain value. For the implementation I began by looking at https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for inspiration. I ended up implementing a business rule as a Java class with a match-predicate & an action. The records enter the pipeline on a data stream which is joined with the rules in a broadcast stream and a ProcessFunction checks each record to see if it matches any rule predicates. If the record doesn't match any business rule predicates it continues on in the pipeline. If the record does match one or more business rule predicates it is sent to a side output with the list of business rules that it matched. The side output data stream goes through a RichAsyncFunction which loops through the matched rules and applies each one's action to the record. At the end, that enriched side-output record stream is unioned back with the non-enriched record stream. This all worked fine.

I have some new business rules which are more complicated and require sending the record's text field to different pre-trained NLP models for prediction, e.g. if a model predicts the text language is X then update field Y to that value, if another model predicts the sentiment is positive then set some other field to another value. I'm planning on using seldon-core to serve these pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in Flink. I could add in a new ProcessFunction in my pipeline before my existing enrichment-rule-predicate ProcessFunction and have it send the text to each of the prediction models and add the results for each one to the record so it's available for the enrichment step. The downside of this is that in the future I'm anticipating having more and more models, and not necessarily wanting to send each record to every model for prediction. e.g. I might have a business rule which says if the author of the text is X then get the sentiment (via the sentiment model) and update field Z, so it would be a waste of time doing that for all records.

I had a look at stateful functions. There's an example in the statefun.io overview which shows having a stateful function for doing a fraud model prediction based on if an account has had X number of frauds detected in the last 30 days, so the key for the state is an account number. In my case, these model predictions don't really have any state - they just take input and return a prediction, they're more like a stateless lambda function. Also, I was wondering if I implemented these as stateful functions would I be able to make them available to other Flink jobs within the cluster, as opposed to having them as individual RichAsyncFunctions defined within a single Flink job and only available to that. The last thing which made stateful functions sound good was that at the moment all my business rules happen to be orthogonal, but I can imagine in the future where I might want one rule to be based on another one, and whereas regular dataflows have to be an acyclic graph stateful functions could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions + ML model prediction

Tzu-Li (Gordon) Tai
Hi John,

It is definitely possible to use Apache Pulsar with StateFun. Could you open a JIRA ticket for that?
It would be nice to see how much interest we can gather on adding that as a new IO module, and consider adding native support for Pulsar in future releases.

If you are already using StateFun and want to start using Pulsar as an ingress/egress already for current versions, there's also a way to do that right now.
If that's the case, please let me know and I'll try to provide some guidelines on how to achieve that.

Cheers,
Gordon


On Fri, Oct 2, 2020, 1:38 AM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward (https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka & Kinesis are supported, and looking at https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO packages for those two: statefun-kafka-io & statefun-kinesis-io


Is it possible to use Apache Pulsar as a Statefun ingress & egress?

Thanks,
John.


From: John Morrow <[hidden email]>
Sent: Wednesday 23 September 2020 11:37
To: Igal Shilman <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds like a good direction.

Thanks again,
John.


From: Igal Shilman <[hidden email]>
Sent: Wednesday 23 September 2020 09:06
To: John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the necessary context (any previous state for a key, and the message) to the HTTP request.
So practically speaking the same remote function can be contacted by different Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules and the enrichment to the remote function. 
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new version of the remote function container, as they can be independy restarted (without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an RichAsyncFunction, as StateFun, by default, invokes many remote functions in parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The records are sourced from a message queue, enriched as they flow through the pipeline based on business rules and finally written to a database. We're using the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a certain word then set field Y to a certain value. For the implementation I began by looking at https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for inspiration. I ended up implementing a business rule as a Java class with a match-predicate & an action. The records enter the pipeline on a data stream which is joined with the rules in a broadcast stream and a ProcessFunction checks each record to see if it matches any rule predicates. If the record doesn't match any business rule predicates it continues on in the pipeline. If the record does match one or more business rule predicates it is sent to a side output with the list of business rules that it matched. The side output data stream goes through a RichAsyncFunction which loops through the matched rules and applies each one's action to the record. At the end, that enriched side-output record stream is unioned back with the non-enriched record stream. This all worked fine.

I have some new business rules which are more complicated and require sending the record's text field to different pre-trained NLP models for prediction, e.g. if a model predicts the text language is X then update field Y to that value, if another model predicts the sentiment is positive then set some other field to another value. I'm planning on using seldon-core to serve these pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in Flink. I could add in a new ProcessFunction in my pipeline before my existing enrichment-rule-predicate ProcessFunction and have it send the text to each of the prediction models and add the results for each one to the record so it's available for the enrichment step. The downside of this is that in the future I'm anticipating having more and more models, and not necessarily wanting to send each record to every model for prediction. e.g. I might have a business rule which says if the author of the text is X then get the sentiment (via the sentiment model) and update field Z, so it would be a waste of time doing that for all records.

I had a look at stateful functions. There's an example in the statefun.io overview which shows having a stateful function for doing a fraud model prediction based on if an account has had X number of frauds detected in the last 30 days, so the key for the state is an account number. In my case, these model predictions don't really have any state - they just take input and return a prediction, they're more like a stateless lambda function. Also, I was wondering if I implemented these as stateful functions would I be able to make them available to other Flink jobs within the cluster, as opposed to having them as individual RichAsyncFunctions defined within a single Flink job and only available to that. The last thing which made stateful functions sound good was that at the moment all my business rules happen to be orthogonal, but I can imagine in the future where I might want one rule to be based on another one, and whereas regular dataflows have to be an acyclic graph stateful functions could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions + ML model prediction

John Morrow
Thanks for the response Gordon, and that FlinkForward presentation - it's been very helpful.


I did find this page: https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html and there are source/sink connectors for Pulsar (https://github.com/streamnative/pulsar-flink) - I'm guessing that's how I should approach using Pulsar as an ingress/egress?

Cheers,
John.


From: Tzu-Li (Gordon) Tai <[hidden email]>
Sent: Monday 5 October 2020 03:21
To: John Morrow <[hidden email]>; user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Hi John,

It is definitely possible to use Apache Pulsar with StateFun. Could you open a JIRA ticket for that?
It would be nice to see how much interest we can gather on adding that as a new IO module, and consider adding native support for Pulsar in future releases.

If you are already using StateFun and want to start using Pulsar as an ingress/egress already for current versions, there's also a way to do that right now.
If that's the case, please let me know and I'll try to provide some guidelines on how to achieve that.

Cheers,
Gordon


On Fri, Oct 2, 2020, 1:38 AM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward (https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka & Kinesis are supported, and looking at https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO packages for those two: statefun-kafka-io & statefun-kinesis-io


Is it possible to use Apache Pulsar as a Statefun ingress & egress?

Thanks,
John.


From: John Morrow <[hidden email]>
Sent: Wednesday 23 September 2020 11:37
To: Igal Shilman <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds like a good direction.

Thanks again,
John.


From: Igal Shilman <[hidden email]>
Sent: Wednesday 23 September 2020 09:06
To: John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the necessary context (any previous state for a key, and the message) to the HTTP request.
So practically speaking the same remote function can be contacted by different Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules and the enrichment to the remote function. 
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new version of the remote function container, as they can be independy restarted (without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an RichAsyncFunction, as StateFun, by default, invokes many remote functions in parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The records are sourced from a message queue, enriched as they flow through the pipeline based on business rules and finally written to a database. We're using the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a certain word then set field Y to a certain value. For the implementation I began by looking at https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for inspiration. I ended up implementing a business rule as a Java class with a match-predicate & an action. The records enter the pipeline on a data stream which is joined with the rules in a broadcast stream and a ProcessFunction checks each record to see if it matches any rule predicates. If the record doesn't match any business rule predicates it continues on in the pipeline. If the record does match one or more business rule predicates it is sent to a side output with the list of business rules that it matched. The side output data stream goes through a RichAsyncFunction which loops through the matched rules and applies each one's action to the record. At the end, that enriched side-output record stream is unioned back with the non-enriched record stream. This all worked fine.

I have some new business rules which are more complicated and require sending the record's text field to different pre-trained NLP models for prediction, e.g. if a model predicts the text language is X then update field Y to that value, if another model predicts the sentiment is positive then set some other field to another value. I'm planning on using seldon-core to serve these pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in Flink. I could add in a new ProcessFunction in my pipeline before my existing enrichment-rule-predicate ProcessFunction and have it send the text to each of the prediction models and add the results for each one to the record so it's available for the enrichment step. The downside of this is that in the future I'm anticipating having more and more models, and not necessarily wanting to send each record to every model for prediction. e.g. I might have a business rule which says if the author of the text is X then get the sentiment (via the sentiment model) and update field Z, so it would be a waste of time doing that for all records.

I had a look at stateful functions. There's an example in the statefun.io overview which shows having a stateful function for doing a fraud model prediction based on if an account has had X number of frauds detected in the last 30 days, so the key for the state is an account number. In my case, these model predictions don't really have any state - they just take input and return a prediction, they're more like a stateless lambda function. Also, I was wondering if I implemented these as stateful functions would I be able to make them available to other Flink jobs within the cluster, as opposed to having them as individual RichAsyncFunctions defined within a single Flink job and only available to that. The last thing which made stateful functions sound good was that at the moment all my business rules happen to be orthogonal, but I can imagine in the future where I might want one rule to be based on another one, and whereas regular dataflows have to be an acyclic graph stateful functions could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions + ML model prediction

Tzu-Li (Gordon) Tai
Hi John,

Thanks a lot for opening the JIRA ticket! If you are interested in contributing that to StateFun, I'm also happy to guide you with the contribution.

On Mon, Oct 5, 2020 at 10:24 PM John Morrow <[hidden email]> wrote:
Thanks for the response Gordon, and that FlinkForward presentation - it's been very helpful.


I did find this page: https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html and there are source/sink connectors for Pulsar (https://github.com/streamnative/pulsar-flink) - I'm guessing that's how I should approach using Pulsar as an ingress/egress?

That is correct! The `SourceFunctionSpec` and `SinkFunctionSpec` are a means for users to bridge existing Flink sources and sinks to StateFun ingress / egress.

The downside to that approach, is that even if you're purely using remote functions, you'd still have to provide an embedded module to add ingresses / egresses this way.
Eventually it would be best (if we have several users requesting Pulsar) to have native support like Kinesis and Kafka so that users can define them textually in `module.yaml` definition files, but this approach you pointed definitely works for the time being.

Cheers,
Gordon
 

Cheers,
John.


From: Tzu-Li (Gordon) Tai <[hidden email]>
Sent: Monday 5 October 2020 03:21
To: John Morrow <[hidden email]>; user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Hi John,

It is definitely possible to use Apache Pulsar with StateFun. Could you open a JIRA ticket for that?
It would be nice to see how much interest we can gather on adding that as a new IO module, and consider adding native support for Pulsar in future releases.

If you are already using StateFun and want to start using Pulsar as an ingress/egress already for current versions, there's also a way to do that right now.
If that's the case, please let me know and I'll try to provide some guidelines on how to achieve that.

Cheers,
Gordon


On Fri, Oct 2, 2020, 1:38 AM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward (https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka & Kinesis are supported, and looking at https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO packages for those two: statefun-kafka-io & statefun-kinesis-io


Is it possible to use Apache Pulsar as a Statefun ingress & egress?

Thanks,
John.


From: John Morrow <[hidden email]>
Sent: Wednesday 23 September 2020 11:37
To: Igal Shilman <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds like a good direction.

Thanks again,
John.


From: Igal Shilman <[hidden email]>
Sent: Wednesday 23 September 2020 09:06
To: John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful Functions + ML model prediction
 
Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the necessary context (any previous state for a key, and the message) to the HTTP request.
So practically speaking the same remote function can be contacted by different Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules and the enrichment to the remote function. 
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new version of the remote function container, as they can be independy restarted (without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an RichAsyncFunction, as StateFun, by default, invokes many remote functions in parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow <[hidden email]> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The records are sourced from a message queue, enriched as they flow through the pipeline based on business rules and finally written to a database. We're using the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a certain word then set field Y to a certain value. For the implementation I began by looking at https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for inspiration. I ended up implementing a business rule as a Java class with a match-predicate & an action. The records enter the pipeline on a data stream which is joined with the rules in a broadcast stream and a ProcessFunction checks each record to see if it matches any rule predicates. If the record doesn't match any business rule predicates it continues on in the pipeline. If the record does match one or more business rule predicates it is sent to a side output with the list of business rules that it matched. The side output data stream goes through a RichAsyncFunction which loops through the matched rules and applies each one's action to the record. At the end, that enriched side-output record stream is unioned back with the non-enriched record stream. This all worked fine.

I have some new business rules which are more complicated and require sending the record's text field to different pre-trained NLP models for prediction, e.g. if a model predicts the text language is X then update field Y to that value, if another model predicts the sentiment is positive then set some other field to another value. I'm planning on using seldon-core to serve these pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in Flink. I could add in a new ProcessFunction in my pipeline before my existing enrichment-rule-predicate ProcessFunction and have it send the text to each of the prediction models and add the results for each one to the record so it's available for the enrichment step. The downside of this is that in the future I'm anticipating having more and more models, and not necessarily wanting to send each record to every model for prediction. e.g. I might have a business rule which says if the author of the text is X then get the sentiment (via the sentiment model) and update field Z, so it would be a waste of time doing that for all records.

I had a look at stateful functions. There's an example in the statefun.io overview which shows having a stateful function for doing a fraud model prediction based on if an account has had X number of frauds detected in the last 30 days, so the key for the state is an account number. In my case, these model predictions don't really have any state - they just take input and return a prediction, they're more like a stateless lambda function. Also, I was wondering if I implemented these as stateful functions would I be able to make them available to other Flink jobs within the cluster, as opposed to having them as individual RichAsyncFunctions defined within a single Flink job and only available to that. The last thing which made stateful functions sound good was that at the moment all my business rules happen to be orthogonal, but I can imagine in the future where I might want one rule to be based on another one, and whereas regular dataflows have to be an acyclic graph stateful functions could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.