Restore metrics on broadcast state after restart

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Restore metrics on broadcast state after restart

Gaël Renoux
Hi everyone

I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of rules), and I have set up a few gauge metrics on that state (things such as number of known rules and timestamp of the last rule received). However, I have on an issue when the server restarts from a checkpoint or a savepoint: metrics values are not restored.

That's nothing anomalous: the fields used in the metrics are transient, not part of the state (I have followed this doc: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types). The fields will be reset to the proper value in the next call to processBroadcastElement(), but that's not enough for my use case: rules updates aren't that frequent (it could be minutes or even hours before the next one). We can't have the metrics offline for that long.

Is there any way to reset those fields without waiting for the next messages to arrive? The open() method doesn't have access to the broadcast state, so I can't do it there. I could do it in processElement() (normal element are much more frequent than rules), but it's far from ideal:
- it would be done again and again for every single element received, which is overkill;
- it could only update the metric on the current subtask, not the others, so one subtask could lag behind.

Am I missing something here ? Is there any way to trigger a reset of the value when the broadcast state is reconstructed ?

Thanks for any help,
Gaël Renoux

Reply | Threaded
Open this post in threaded view
|

Re: Restore metrics on broadcast state after restart

Yun Tang
Hi Gaël

You can try initializeState [1] to initialize your metrics values from states when restoring from a checkpoint.

context.getOperatorStateStore().getBroadcastState()  could visit your restored broadcast state.


Best
Yun Tang


From: Gaël Renoux <[hidden email]>
Sent: Tuesday, December 17, 2019 23:22
To: user <[hidden email]>
Subject: Restore metrics on broadcast state after restart
 
Hi everyone

I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of rules), and I have set up a few gauge metrics on that state (things such as number of known rules and timestamp of the last rule received). However, I have on an issue when the server restarts from a checkpoint or a savepoint: metrics values are not restored.

That's nothing anomalous: the fields used in the metrics are transient, not part of the state (I have followed this doc: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types). The fields will be reset to the proper value in the next call to processBroadcastElement(), but that's not enough for my use case: rules updates aren't that frequent (it could be minutes or even hours before the next one). We can't have the metrics offline for that long.

Is there any way to reset those fields without waiting for the next messages to arrive? The open() method doesn't have access to the broadcast state, so I can't do it there. I could do it in processElement() (normal element are much more frequent than rules), but it's far from ideal:
- it would be done again and again for every single element received, which is overkill;
- it could only update the metric on the current subtask, not the others, so one subtask could lag behind.

Am I missing something here ? Is there any way to trigger a reset of the value when the broadcast state is reconstructed ?

Thanks for any help,
Gaël Renoux

Reply | Threaded
Open this post in threaded view
|

Re: Restore metrics on broadcast state after restart

Gaël Renoux
Thanks, that's exactly what I needed!

On Wed, Dec 18, 2019 at 5:44 PM Yun Tang <[hidden email]> wrote:
Hi Gaël

You can try initializeState [1] to initialize your metrics values from states when restoring from a checkpoint.

context.getOperatorStateStore().getBroadcastState()  could visit your restored broadcast state.


Best
Yun Tang


From: Gaël Renoux <[hidden email]>
Sent: Tuesday, December 17, 2019 23:22
To: user <[hidden email]>
Subject: Restore metrics on broadcast state after restart
 
Hi everyone

I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of rules), and I have set up a few gauge metrics on that state (things such as number of known rules and timestamp of the last rule received). However, I have on an issue when the server restarts from a checkpoint or a savepoint: metrics values are not restored.

That's nothing anomalous: the fields used in the metrics are transient, not part of the state (I have followed this doc: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types). The fields will be reset to the proper value in the next call to processBroadcastElement(), but that's not enough for my use case: rules updates aren't that frequent (it could be minutes or even hours before the next one). We can't have the metrics offline for that long.

Is there any way to reset those fields without waiting for the next messages to arrive? The open() method doesn't have access to the broadcast state, so I can't do it there. I could do it in processElement() (normal element are much more frequent than rules), but it's far from ideal:
- it would be done again and again for every single element received, which is overkill;
- it could only update the metric on the current subtask, not the others, so one subtask could lag behind.

Am I missing something here ? Is there any way to trigger a reset of the value when the broadcast state is reconstructed ?

Thanks for any help,
Gaël Renoux



--
Gaël Renoux
Senior R&D Engineer, DataDome
M <a href="tel:+33+6+76+89+16+52" style="text-decoration:none;color:rgb(68,68,68);font-family:Arial,Helvetica,sans-serif" target="_blank"> +33 6 76 89 16 52 
E [hidden email]
W www.datadome.co
  
Read DataDome reviews on G2