Hi everyone,
What is the recommended way of achieving the equivalent of a broadcast in Flink when using Stateful Functions? For instance, assume we are implementing something similar to Flink's demo fraud detection but in Stateful Functions - how can one dynamically update the application's logic then? There was a similar question in this mailing list in the past where it was recommended moving the dynamic logic to a remote function so that one could achieve that by deploying a new container. I think that's not very realistic as updates might happen with a frequency that's not compatible with that approach (e.g., sticking to the fraud detection example, updating fraud detection rules every hour is not unusual), nor should one be deploying a new container when data (not code) changes. Is there a way of, for example, modifying FunctionProviders on the fly? Thanks, Miguel |
Hi, FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging primitive in StateFun: https://issues.apache.org/jira/browse/FLINK-16319 This is probably what you are looking for. And I do agree, in the case that the control stream (which updates the application logic) is high volume, redeploying functions may not work well. I don't think there really is a "recommended" way of doing the "broadcast control stream, join with main stream" pattern with StateFun at the moment, at least without FLINK-16319. On the other hand, it could be possible to use stateful functions to implement a pub-sub model in user space for the time being. I've actually left some ideas for implementing that in the comments of FLINK-16319. Cheers, Gordon On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <[hidden email]> wrote:
|
Hi Miguel, I think that there are a couple of ways to achieve this, and it really depends on your specific use case, and the trade-offs that you are willing to accept. For example, one way to approach this: - Suppose you have an external service somewhere that returns a representation of the logic to be interpreted by your function at runtime (I think that is the scenario you are describing) - Then, you can write a background task (a thread) that periodically queries that service, and keeps in memory the latest version. - You can initialize this background task in your FunctionProvider implementation, or even in your StatefulModule if you wish. - Then, make sure that your dynamic stateful function has an access to the latest value fetched by your client (for example via a shared reference like a j.u.c.AtomicReference) - Then on receive, you can simply get that reference and re-apply your rules. Take a look at [1] for example (it is not exactly the same, but I believe that it is close enough) Good luck, Igal. On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
Hi Gordon, Igal, Thanks for your replies. PubSub would be a good addition, I have a few scenarios where that would be useful. However, after reading your answers I realized that your proposed solutions (which address the most obvious interpretation of my question) do not necessarily solve my problem. I should have just stated what it was, instead of trying to propose a solution by discussing broadcast... I'm trying to implement an "orchestrator" function which, given an event, will trigger multiple remote function calls, aggregate their results and eventually call yet more functions (based on a provided dependency graph). Hence, this orchestrator function has state per event_id and each function instance is short-lived (a couple seconds at most, ideally sub-second). The question then is not about how to modify a long-running function instance (which PubSub would enable), but rather how to have the dependency graph available to new functions. Given this, Igal's answer seems promising because we have the FunctionProvider instantiating a local variable and passing it down on every instantiation. I'm assuming there is one FunctionProvider per TaskManager. Is there an easy way to have the FunctionProvider receiving data coming from a Flink DataStream, or receiving StateFun messages? Otherwise, I could have it subscribe to a Kafka topic directly. I really appreciate your help. Miguel Igal Shilman <[hidden email]> escreveu no dia segunda, 22/02/2021 à(s) 12:09:
|
Another possibility I am considering is handling this in Flink using a broadcast and adding all the information needed to the event itself. I'm a little concerned about the amount of data that will be serialized and sent on every request though, as I'll need to include information about all available remote functions, for instance. Miguel Araújo <[hidden email]> escreveu no dia terça, 23/02/2021 à(s) 09:14:
|
Hey Miguel, What you are describing is exactly what is implemented in this repo. The TransactionManager function acts as an orchestrator to work with the other functions. The repo is structured as an exercise but the full solution exists on the branch `advanced-solution`. On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo <[hidden email]> wrote:
|
Hi Seth, Thanks for your comment. I've seen that repository in the past and it was really helpful to "validate" that this was the way to go. I think my question is not being addressed there though: how could one add dynamic behavior to your TransactionManager? In this case, state that is available to all TransactionManager instances when they receive a message of type Transaction for the first time. Seth Wiesman <[hidden email]> escreveu no dia terça, 23/02/2021 à(s) 16:02:
|
I don't think there is anything statefun specific here and I would follow Igals advice. Let's say you have a state value called `Behavior` that describes the behavior of an instance. There is a default behavior but any given instance may have a customized behavior. What I would do is the following. Create a state in the TransactionManager called `behavior` that stores the instance's customized behavior if it exists. When a transaction comes in, read the behavior state. If it exists (is not None in the case of Python) then use that. If not, then fall back to the default instance. The default instance can be provided one of several ways depending on the specifics of your use case: 1) hard-coded in the function. 2) dynamically loaded via a background thread as a global. so long as that default is immutable this is safe 3) dynamically loaded via the function instance on first use. stateful functions have strong support for making async requests so you could simply query the behavior for that instance on first use from a 3rd party service. Seth On Tue, Feb 23, 2021 at 10:55 AM Miguel Araújo <[hidden email]> wrote:
|
Thanks Seth. I understood Igal's suggestion. My concern was about maintaining a separate service (outside flink/statefun) when this control stream might be an incremental stream as well (think, rules in fraud detection - although this is not a fraud detection application, but the example is good). I wouldn't want to implement fault tolerance, checkpointing, HA, etc. myself. I now see that I wasn't thinking a step ahead - just because it is a separate service from statefun's point of view, it doesn't mean it can't be implemented in flink if it turns out to be the most appropriate tool. Thanks for all suggestions, this was definitely helpful. Miguel Seth Wiesman <[hidden email]> escreveu no dia terça, 23/02/2021 à(s) 17:08:
|
Free forum by Nabble | Edit this page |