Re: Stateful function metrics

Posted by Igal Shilman on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Stateful-function-metrics-tp31852p31881.html

Hi Dan,

I think that your approach makes sense, and maybe there is a case for either a built in pubsub mechanism (I’ve been asked something similar at Flink Forward this year) or broadcasting a message to a pattern (amazon:*)

For the moment, assuming the way your key is designed is not going to be changed, I can imagine an efficient pubsub mechanism implemented in “user land” as a stateful function. Having a single instance per topic would limit scalability severely, therefore I’d suggest mixing into the topic name a finite number of tokens (also called key salting). You would also have to have a logic of assigning a subscriber to a token, and on publish you’d have to supply the message for each token.
The number of tokens is up to you to decide, but it is a function of your parallelism, the number of subscribers per topic (the number of participantId per instrumentId).
(In your attached solution the number of tokens is 1)

Happy new year,
Igal

On Monday, December 30, 2019, Dan Pettersson <[hidden email]> wrote:

Hi Igal and Thanks for your quick response and yes, you got my second question right.

I'm a building a small PoC around fraudulent trades and in short, I've fine-grained the
functions to the level participantId + "::" + instrumentId (ie "BankA::AMAZON")

In this flow of stock exchange messages, there are messages that tells the market
if the instrument has opened, is being halted or being closed for the day.

These messages come on instrumentId level so I have to route these message to all functions with the key participantId + "::" + (actual)instrumentId. 
I had hoped to be able to get a copy of all functions from the repository to loop thru them and dispatch but I can't find a way to get hold of them. 
Is there any way I can get them?

I haven't studied the core functionality enough but could it be an option to open up the repository and return a copy of the 
ObjectOpenHashMap that holds all the functions? I guess it's not a common requirement so to keep them hidden is probably the best option.

As a workaround, I've created "Function listeners" where functions can subscribe to a certain type of message.

For example, FnIsClosingNotifier (key is instrumentId) is holding a PersistenceValue with all the function addresses
that subscribe to an instrument closing message. The subscription is done from other functions in the configuration by just sending
a "Protobuf empty message" and when the closing message arrives the dispatch to the listeners is done in FnIsClosingNotifier.

Is there a better way that you can think of to implement this kind of requirement, where one message should be sent to (on beforehand not known) 
several subscribing functions.

Below is some code that hopefully describes my current implementation to subscribe to a certain type of message.

The function that wants to be notified when the closing message arrives. This function has the id participantId::InstrumentId

image.png

And the notifier that holds all subscribers Addresses in the persistenceValue "listeners"
image.png
Regards
Dan

Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman <[hidden email]>:
Hi Dan,

You can learn more about Flink’s metrics system at [1] 
You would be able to either setup a reporter that would export the metrics to an external system, or query the metrics via the REST API, or simply use Flink’s web ui to obtain them.

If I understand the second part of your question correctly - you have a persisted value in a base class, but few different function types that derive from that base class, and you are wondering what is the scope of that persisted value?
If that is the question, then the scope is bound to the function address(type+id) and not to the Java instance.
So it is safe.


Happy hacking,
Igal


On Sunday, December 29, 2019, Dan Pettersson <[hidden email]> wrote:
Hi all

I'm trying to get hold of some metrics from the functions that I have created but can't find a way to get them. It's the metrics mentioned here I'm interested about:
Any suggestions are appreciated.

I also have a question regarding "best practice" when dealing with function that share business logic. Is it safe storage wise to extends an Abstract class that holds the persistent values?  

Thanks in advance and Happy coding during the holidays :-)

Regards 
Dan