Dynamic configuration via broadcast state

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic configuration via broadcast state

vishalovercome
This post was updated on .
I have to make my flink job dynamically configurable and I'm thinking about using broadcast state. My current static job configuration file consists of configuration of entire set of operators which I load into a case class and then I explicitly pass the relevant configuration of each operator as its constructor parameters.

1. Is there a way to guarantee that processElement isn't called before the first processBroadcastElement will be called? How else can we ensure that each operator always starts with valid configuration? Passing the same configuration as constructor parameters is one way to deal with it but its clumsy because that's just repetition of code. Loading configuration in open method is even worse because each operator will now have access to entire job configuration.

I have seen suggestions about caching all elements received in processElement until the broadcast state is ready but that's simply not an option given how large the rate of incoming elements are.

2. What can we do to make source and sink connectors dynamically
configurable?
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic configuration via broadcast state

vishalovercome
I researched a bit more and another suggested solution is to build a custom
source function that somehow waits for each operator to load it's
configuration which is infact set in the open method of the source itself.
I'm not sure if that's a good idea as that just exposes entire job
configuration to an operator.

Can we leverage watermarks/idle sources somehow? Basically set the timestamp
of configuration stream to a very low number at the start and then force it
to be read before data from other sources start flowing in. As
configurations aren't going to change frequently we can idle these sources.

1. Is the above approach even possible?
2. Can an idle source resume once configuration changes?

A rough sketch of timestamp assignment, re-activating an idle source would
help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic configuration via broadcast state

Arvid Heise-4
Hi Vishal,

what you are trying to achieve is quite common and has its own documentation [1]. Currently, there is no way to hold back elements of the non-broadcast side (your question 2 in OP), so you have to save them until configuration arrives.

If you have several configurable operators, you could try to create a generic configuration holder and chain the actual operator to it [2] or you create a base class that does all the work and you just override how the configuration is applied to all elements.

For sources, you have to implement your own source, for sinks you can use the same chaining trick.

I currently don't see how you can use watermarks can help. We are still in process of providing a way to synchronize sources with different timestamps automatically and it will arrive not before Flink 1.14.

---

If configuration changes are quite rare, there is an easier option for you that is viable if your state is not huge: you could simply load configuration statically in `open` and fail on configuration change to trigger a recovery. That keeps the whole DataStream simple at the cost of additional recoveries.


On Wed, Apr 7, 2021 at 12:37 AM vishalovercome <[hidden email]> wrote:
I researched a bit more and another suggested solution is to build a custom
source function that somehow waits for each operator to load it's
configuration which is infact set in the open method of the source itself.
I'm not sure if that's a good idea as that just exposes entire job
configuration to an operator.

Can we leverage watermarks/idle sources somehow? Basically set the timestamp
of configuration stream to a very low number at the start and then force it
to be read before data from other sources start flowing in. As
configurations aren't going to change frequently we can idle these sources.

1. Is the above approach even possible?
2. Can an idle source resume once configuration changes?

A rough sketch of timestamp assignment, re-activating an idle source would
help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/