Stateful function metrics

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful function metrics

danp
Hi all

I'm trying to get hold of some metrics from the functions that I have created but can't find a way to get them. It's the metrics mentioned here I'm interested about:
Any suggestions are appreciated.

I also have a question regarding "best practice" when dealing with function that share business logic. Is it safe storage wise to extends an Abstract class that holds the persistent values?  

Thanks in advance and Happy coding during the holidays :-)

Regards 
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Stateful function metrics

Igal Shilman
Hi Dan,

You can learn more about Flink’s metrics system at [1] 
You would be able to either setup a reporter that would export the metrics to an external system, or query the metrics via the REST API, or simply use Flink’s web ui to obtain them.

If I understand the second part of your question correctly - you have a persisted value in a base class, but few different function types that derive from that base class, and you are wondering what is the scope of that persisted value?
If that is the question, then the scope is bound to the function address(type+id) and not to the Java instance.
So it is safe.


Happy hacking,
Igal


On Sunday, December 29, 2019, Dan Pettersson <[hidden email]> wrote:
Hi all

I'm trying to get hold of some metrics from the functions that I have created but can't find a way to get them. It's the metrics mentioned here I'm interested about:
Any suggestions are appreciated.

I also have a question regarding "best practice" when dealing with function that share business logic. Is it safe storage wise to extends an Abstract class that holds the persistent values?  

Thanks in advance and Happy coding during the holidays :-)

Regards 
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Stateful function metrics

danp

Hi Igal and Thanks for your quick response and yes, you got my second question right.

I'm a building a small PoC around fraudulent trades and in short, I've fine-grained the
functions to the level participantId + "::" + instrumentId (ie "BankA::AMAZON")

In this flow of stock exchange messages, there are messages that tells the market
if the instrument has opened, is being halted or being closed for the day.

These messages come on instrumentId level so I have to route these message to all functions with the key participantId + "::" + (actual)instrumentId. 
I had hoped to be able to get a copy of all functions from the repository to loop thru them and dispatch but I can't find a way to get hold of them. 
Is there any way I can get them?

I haven't studied the core functionality enough but could it be an option to open up the repository and return a copy of the 
ObjectOpenHashMap that holds all the functions? I guess it's not a common requirement so to keep them hidden is probably the best option.

As a workaround, I've created "Function listeners" where functions can subscribe to a certain type of message.

For example, FnIsClosingNotifier (key is instrumentId) is holding a PersistenceValue with all the function addresses
that subscribe to an instrument closing message. The subscription is done from other functions in the configuration by just sending
a "Protobuf empty message" and when the closing message arrives the dispatch to the listeners is done in FnIsClosingNotifier.

Is there a better way that you can think of to implement this kind of requirement, where one message should be sent to (on beforehand not known) 
several subscribing functions.

Below is some code that hopefully describes my current implementation to subscribe to a certain type of message.

The function that wants to be notified when the closing message arrives. This function has the id participantId::InstrumentId

image.png

And the notifier that holds all subscribers Addresses in the persistenceValue "listeners"
image.png
Regards
Dan

Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman <[hidden email]>:
Hi Dan,

You can learn more about Flink’s metrics system at [1] 
You would be able to either setup a reporter that would export the metrics to an external system, or query the metrics via the REST API, or simply use Flink’s web ui to obtain them.

If I understand the second part of your question correctly - you have a persisted value in a base class, but few different function types that derive from that base class, and you are wondering what is the scope of that persisted value?
If that is the question, then the scope is bound to the function address(type+id) and not to the Java instance.
So it is safe.


Happy hacking,
Igal


On Sunday, December 29, 2019, Dan Pettersson <[hidden email]> wrote:
Hi all

I'm trying to get hold of some metrics from the functions that I have created but can't find a way to get them. It's the metrics mentioned here I'm interested about:
Any suggestions are appreciated.

I also have a question regarding "best practice" when dealing with function that share business logic. Is it safe storage wise to extends an Abstract class that holds the persistent values?  

Thanks in advance and Happy coding during the holidays :-)

Regards 
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Stateful function metrics

Igal Shilman
Hi Dan,

I think that your approach makes sense, and maybe there is a case for either a built in pubsub mechanism (I’ve been asked something similar at Flink Forward this year) or broadcasting a message to a pattern (amazon:*)

For the moment, assuming the way your key is designed is not going to be changed, I can imagine an efficient pubsub mechanism implemented in “user land” as a stateful function. Having a single instance per topic would limit scalability severely, therefore I’d suggest mixing into the topic name a finite number of tokens (also called key salting). You would also have to have a logic of assigning a subscriber to a token, and on publish you’d have to supply the message for each token.
The number of tokens is up to you to decide, but it is a function of your parallelism, the number of subscribers per topic (the number of participantId per instrumentId).
(In your attached solution the number of tokens is 1)

Happy new year,
Igal

On Monday, December 30, 2019, Dan Pettersson <[hidden email]> wrote:

Hi Igal and Thanks for your quick response and yes, you got my second question right.

I'm a building a small PoC around fraudulent trades and in short, I've fine-grained the
functions to the level participantId + "::" + instrumentId (ie "BankA::AMAZON")

In this flow of stock exchange messages, there are messages that tells the market
if the instrument has opened, is being halted or being closed for the day.

These messages come on instrumentId level so I have to route these message to all functions with the key participantId + "::" + (actual)instrumentId. 
I had hoped to be able to get a copy of all functions from the repository to loop thru them and dispatch but I can't find a way to get hold of them. 
Is there any way I can get them?

I haven't studied the core functionality enough but could it be an option to open up the repository and return a copy of the 
ObjectOpenHashMap that holds all the functions? I guess it's not a common requirement so to keep them hidden is probably the best option.

As a workaround, I've created "Function listeners" where functions can subscribe to a certain type of message.

For example, FnIsClosingNotifier (key is instrumentId) is holding a PersistenceValue with all the function addresses
that subscribe to an instrument closing message. The subscription is done from other functions in the configuration by just sending
a "Protobuf empty message" and when the closing message arrives the dispatch to the listeners is done in FnIsClosingNotifier.

Is there a better way that you can think of to implement this kind of requirement, where one message should be sent to (on beforehand not known) 
several subscribing functions.

Below is some code that hopefully describes my current implementation to subscribe to a certain type of message.

The function that wants to be notified when the closing message arrives. This function has the id participantId::InstrumentId

image.png

And the notifier that holds all subscribers Addresses in the persistenceValue "listeners"
image.png
Regards
Dan

Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman <[hidden email]>:
Hi Dan,

You can learn more about Flink’s metrics system at [1] 
You would be able to either setup a reporter that would export the metrics to an external system, or query the metrics via the REST API, or simply use Flink’s web ui to obtain them.

If I understand the second part of your question correctly - you have a persisted value in a base class, but few different function types that derive from that base class, and you are wondering what is the scope of that persisted value?
If that is the question, then the scope is bound to the function address(type+id) and not to the Java instance.
So it is safe.


Happy hacking,
Igal


On Sunday, December 29, 2019, Dan Pettersson <[hidden email]> wrote:
Hi all

I'm trying to get hold of some metrics from the functions that I have created but can't find a way to get them. It's the metrics mentioned here I'm interested about:
Any suggestions are appreciated.

I also have a question regarding "best practice" when dealing with function that share business logic. Is it safe storage wise to extends an Abstract class that holds the persistent values?  

Thanks in advance and Happy coding during the holidays :-)

Regards 
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Stateful function metrics

danp
Hi again,

My current implementation of "pubsub" brings quite a lot of boilerplate I must say, so the "Broadcast by pattern" idea would be very useful for this specific requirement! 
I'll look in the Stateful functions code and see if I can come up with a suggestion.

I'm new to distributed computing and "Key salting" but it makes sense to partition out the subscribers to increase the parallelism. 
I can't fully understand when you wrote this though: "You would also have to have a logic of assigning a subscriber to a token, and on publish you’d have to supply the message for each token."

Wouldn't it be as simple to just add the below in my example from FnLargeTradeByParticipant:

context.send(FnInstrumentIsClosingNotifier.TYPE, trade.getInstrumentId + (int)(Math.random()*x),  ......... ) ?

as the publisher is storing the subscribers address ?

Thanks,

Regards
Dan

Den tis 31 dec. 2019 kl 20:11 skrev Igal Shilman <[hidden email]>:
Hi Dan,

I think that your approach makes sense, and maybe there is a case for either a built in pubsub mechanism (I’ve been asked something similar at Flink Forward this year) or broadcasting a message to a pattern (amazon:*)

For the moment, assuming the way your key is designed is not going to be changed, I can imagine an efficient pubsub mechanism implemented in “user land” as a stateful function. Having a single instance per topic would limit scalability severely, therefore I’d suggest mixing into the topic name a finite number of tokens (also called key salting). You would also have to have a logic of assigning a subscriber to a token, and on publish you’d have to supply the message for each token.
The number of tokens is up to you to decide, but it is a function of your parallelism, the number of subscribers per topic (the number of participantId per instrumentId).
(In your attached solution the number of tokens is 1)

Happy new year,
Igal

On Monday, December 30, 2019, Dan Pettersson <[hidden email]> wrote:

Hi Igal and Thanks for your quick response and yes, you got my second question right.

I'm a building a small PoC around fraudulent trades and in short, I've fine-grained the
functions to the level participantId + "::" + instrumentId (ie "BankA::AMAZON")

In this flow of stock exchange messages, there are messages that tells the market
if the instrument has opened, is being halted or being closed for the day.

These messages come on instrumentId level so I have to route these message to all functions with the key participantId + "::" + (actual)instrumentId. 
I had hoped to be able to get a copy of all functions from the repository to loop thru them and dispatch but I can't find a way to get hold of them. 
Is there any way I can get them?

I haven't studied the core functionality enough but could it be an option to open up the repository and return a copy of the 
ObjectOpenHashMap that holds all the functions? I guess it's not a common requirement so to keep them hidden is probably the best option.

As a workaround, I've created "Function listeners" where functions can subscribe to a certain type of message.

For example, FnIsClosingNotifier (key is instrumentId) is holding a PersistenceValue with all the function addresses
that subscribe to an instrument closing message. The subscription is done from other functions in the configuration by just sending
a "Protobuf empty message" and when the closing message arrives the dispatch to the listeners is done in FnIsClosingNotifier.

Is there a better way that you can think of to implement this kind of requirement, where one message should be sent to (on beforehand not known) 
several subscribing functions.

Below is some code that hopefully describes my current implementation to subscribe to a certain type of message.

The function that wants to be notified when the closing message arrives. This function has the id participantId::InstrumentId

image.png

And the notifier that holds all subscribers Addresses in the persistenceValue "listeners"
image.png
Regards
Dan

Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman <[hidden email]>:
Hi Dan,

You can learn more about Flink’s metrics system at [1] 
You would be able to either setup a reporter that would export the metrics to an external system, or query the metrics via the REST API, or simply use Flink’s web ui to obtain them.

If I understand the second part of your question correctly - you have a persisted value in a base class, but few different function types that derive from that base class, and you are wondering what is the scope of that persisted value?
If that is the question, then the scope is bound to the function address(type+id) and not to the Java instance.
So it is safe.


Happy hacking,
Igal


On Sunday, December 29, 2019, Dan Pettersson <[hidden email]> wrote:
Hi all

I'm trying to get hold of some metrics from the functions that I have created but can't find a way to get them. It's the metrics mentioned here I'm interested about:
Any suggestions are appreciated.

I also have a question regarding "best practice" when dealing with function that share business logic. Is it safe storage wise to extends an Abstract class that holds the persistent values?  

Thanks in advance and Happy coding during the holidays :-)

Regards 
Dan
Reply | Threaded
Open this post in threaded view
|

Stateful function metrics

Igal Shilman
(re-adding the user-mailing list)

I think that 100 subscribers is very manageable and doesn’t require any further partitioning.
I think that the general rule of thumb should be having “small” state size per function instance, but this is a case by case thing. 

Thanks,
Igal.

On Thursday, January 2, 2020, Dan Pettersson <[hidden email]> wrote:
 Aah yes, now I get it :-)

I mistakenly thought that the partitions I created of FnInstrumentIsClosingNotifier automatically would recieve the Closing message.
Now I realise that they need to be notified aswell.

Is there any guidelines/recommendations on how to find a suitable partionsize taking for example number of cores into consideration? 
Or is it just good old experiment and measure performance that will show the ideal size? 

For this case it would be around 5000 instrument that has lets say 100 subscribers each.     

Thanks for your help!

/Dan  

Den tors 2 jan. 2020 kl 11:39 skrev Igal Shilman <[hidden email]>:
Hi Dan,

Let me clarify what I mean:
1. Registering a subscriber, means sending a message to a partition of a FnInstrumentIsClosingNotifier, so you need a 
mapping from a subscriberId to a partition (this should be deterministic so that you can unregister the subscriber)
It can be as simple as subscriberId.hashCode() % numPartitions.

2. When you want to notify all the subscribers across all the partitions, you would need to send the message to all the partitions, so that they in turn would notify their subscribers.

I hope that helps,
Igal.

On Wednesday, January 1, 2020, Dan Pettersson <[hidden email]> wrote:
Hi again,

My current implementation of "pubsub" brings quite a lot of boilerplate I must say, so the "Broadcast by pattern" idea would be very useful for this specific requirement! 
I'll look in the Stateful functions code and see if I can come up with a suggestion.

I'm new to distributed computing and "Key salting" but it makes sense to partition out the subscribers to increase the parallelism. 
I can't fully understand when you wrote this though: "You would also have to have a logic of assigning a subscriber to a token, and on publish you’d have to supply the message for each token."

Wouldn't it be as simple to just add the below in my example from FnLargeTradeByParticipant:

context.send(FnInstrumentIsClosingNotifier.TYPE, trade.getInstrumentId + (int)(Math.random()*x),  ......... ) ?

as the publisher is storing the subscribers address ?

Thanks,

Regards
Dan

Den tis 31 dec. 2019 kl 20:11 skrev Igal Shilman <[hidden email]>:
Hi Dan,

I think that your approach makes sense, and maybe there is a case for either a built in pubsub mechanism (I’ve been asked something similar at Flink Forward this year) or broadcasting a message to a pattern (amazon:*)

For the moment, assuming the way your key is designed is not going to be changed, I can imagine an efficient pubsub mechanism implemented in “user land” as a stateful function. Having a single instance per topic would limit scalability severely, therefore I’d suggest mixing into the topic name a finite number of tokens (also called key salting). You would also have to have a logic of assigning a subscriber to a token, and on publish you’d have to supply the message for each token.
The number of tokens is up to you to decide, but it is a function of your parallelism, the number of subscribers per topic (the number of participantId per instrumentId).
(In your attached solution the number of tokens is 1)

Happy new year,
Igal

On Monday, December 30, 2019, Dan Pettersson <[hidden email]> wrote:

Hi Igal and Thanks for your quick response and yes, you got my second question right.

I'm a building a small PoC around fraudulent trades and in short, I've fine-grained the
functions to the level participantId + "::" + instrumentId (ie "BankA::AMAZON")

In this flow of stock exchange messages, there are messages that tells the market
if the instrument has opened, is being halted or being closed for the day.

These messages come on instrumentId level so I have to route these message to all functions with the key participantId + "::" + (actual)instrumentId. 
I had hoped to be able to get a copy of all functions from the repository to loop thru them and dispatch but I can't find a way to get hold of them. 
Is there any way I can get them?

I haven't studied the core functionality enough but could it be an option to open up the repository and return a copy of the 
ObjectOpenHashMap that holds all the functions? I guess it's not a common requirement so to keep them hidden is probably the best option.

As a workaround, I've created "Function listeners" where functions can subscribe to a certain type of message.

For example, FnIsClosingNotifier (key is instrumentId) is holding a PersistenceValue with all the function addresses
that subscribe to an instrument closing message. The subscription is done from other functions in the configuration by just sending
a "Protobuf empty message" and when the closing message arrives the dispatch to the listeners is done in FnIsClosingNotifier.

Is there a better way that you can think of to implement this kind of requirement, where one message should be sent to (on beforehand not known) 
several subscribing functions.

Below is some code that hopefully describes my current implementation to subscribe to a certain type of message.

The function that wants to be notified when the closing message arrives. This function has the id participantId::InstrumentId

image.png

And the notifier that holds all subscribers Addresses in the persistenceValue "listeners"
image.png
Regards
Dan

Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman <[hidden email]>:
Hi Dan,

You can learn more about Flink’s metrics system at [1] 
You would be able to either setup a reporter that would export the metrics to an external system, or query the metrics via the REST API, or simply use Flink’s web ui to obtain them.

If I understand the second part of your question correctly - you have a persisted value in a base class, but few different function types that derive from that base class, and you are wondering what is the scope of that persisted value?
If that is the question, then the scope is bound to the function address(type+id) and not to the Java instance.
So it is safe.


Happy hacking,
Igal


On Sunday, December 29, 2019, Dan Pettersson <[hidden email]> wrote:
Hi all

I'm trying to get hold of some metrics from the functions that I have created but can't find a way to get them. It's the metrics mentioned here I'm interested about:
Any suggestions are appreciated.

I also have a question regarding "best practice" when dealing with function that share business logic. Is it safe storage wise to extends an Abstract class that holds the persistent values?  

Thanks in advance and Happy coding during the holidays :-)

Regards 
Dan