Hi,
can someone elaborate on when I should set properties transient / non-transient within operators (e.g. map / flatMap / reduce) ? I see these two possibilies: (1) initialize a non-transient property from the constructor (2) initialize a transient property inside a Rich???Function when open(ConfigurationParameters) is invoked on what criteria should I choose (1) or (2) ? how is this related to checkpointing / rebalancing? Thanks in advance Peter |
Hi Peter,
there's no need to worry about transient members as the operator itself is not serialized - only the state itself, depending on the state back-end. If you want your state to be recovered by checkpoints you should implement the open() method and initialise your state there as in your point (2) and as described in [1]. If you want to re-scale your job, you have to take a savepoint and may resume from there with a different parallelism [2] but be sure to set a maximum parallelism (per job / or operator) and set UUIDs for operators as described in [3]. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/ savepoints.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/ production_ready.html On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote: > Hi, > > can someone elaborate on when I should set properties transient / > non-transient within operators (e.g. map / flatMap / reduce) ? > > I see these two possibilies: > > (1) initialize a non-transient property from the constructor > (2) initialize a transient property inside a Rich???Function when > open(ConfigurationParameters) is invoked > > on what criteria should I choose (1) or (2) ? > > how is this related to checkpointing / rebalancing? > > Thanks in advance > Peter signature.asc (201 bytes) Download Attachment |
Hi Peter, function objects (such as an instance of a class that extends MapFunction) that are used to construct a plan are serialized using Java serialization and shipped to the workers for execution.2017-08-03 16:00 GMT+02:00 Nico Kruber <[hidden email]>: Hi Peter, |
When running in HA mode or taking savepoints, if we pass configuration as
constructor arguments, then it seems as though changing configuration at a later time doesn't work as it uses state to restore older configuration. How can we pass configuration while having the flexibility to change the values at a later date? I've started another discussion with many more questions - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
if you would like to dynamically adjust configuration of your streaming job, it might be a good approach to consider the configuration as a stream itself. The connect() API can be used to connect a main stream with a control stream. In any case the configuration should be persisted in state if it should be present after restore. Otherwise, you need to implement a logic where the operator must query the latest configuration from some external system which could become the bottleneck. Regards, Timo On 16.12.20 22:07, vishalovercome wrote: > When running in HA mode or taking savepoints, if we pass configuration as > constructor arguments, then it seems as though changing configuration at a > later time doesn't work as it uses state to restore older configuration. How > can we pass configuration while having the flexibility to change the values > at a later date? > > I've started another discussion with many more questions - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Free forum by Nabble | Edit this page |