state inside functions

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

state inside functions

Peter Ertl
Hi,

can someone elaborate on when I should set properties transient / non-transient within operators (e.g. map / flatMap / reduce) ?

I see these two possibilies:

(1) initialize a non-transient property from the constructor
(2) initialize a transient property inside a Rich???Function when open(ConfigurationParameters) is invoked

on what criteria should I choose (1) or (2) ?

how is this related to checkpointing / rebalancing?

Thanks in advance
Peter
Reply | Threaded
Open this post in threaded view
|

Re: state inside functions

Nico Kruber
Hi Peter,
there's no need to worry about transient members as the operator itself is not
serialized - only the state itself, depending on the state back-end.

If you want your state to be recovered by checkpoints you should implement the
open() method and initialise your state there as in your point (2) and as
described in [1].

If you want to re-scale your job, you have to take a savepoint and may resume
from there with a different parallelism [2] but be sure to set a maximum
parallelism (per job / or operator) and set UUIDs for operators as described
in [3].


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/
savepoints.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/
production_ready.html

On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote:

> Hi,
>
> can someone elaborate on when I should set properties transient /
> non-transient within operators (e.g. map / flatMap / reduce) ?
>
> I see these two possibilies:
>
> (1) initialize a non-transient property from the constructor
> (2) initialize a transient property inside a Rich???Function when
> open(ConfigurationParameters) is invoked
>
> on what criteria should I choose (1) or (2) ?
>
> how is this related to checkpointing / rebalancing?
>
> Thanks in advance
> Peter


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: state inside functions

Fabian Hueske-2
Hi Peter,

function objects (such as an instance of a class that extends MapFunction) that are used to construct a plan are serialized using Java serialization and shipped to the workers for execution.
Therefore, function classes must be Serializable. In general it is recommended to configure function objects via the constructor. However, if you have a member property that does not implement Serializable, you should use a RichFunction, make the property transient, and initialize it in open().

Alternatively, you can also override Java's serialization/deserialization methods and implement custom de/serialization logic.

Best, Fabian



2017-08-03 16:00 GMT+02:00 Nico Kruber <[hidden email]>:
Hi Peter,
there's no need to worry about transient members as the operator itself is not
serialized - only the state itself, depending on the state back-end.

If you want your state to be recovered by checkpoints you should implement the
open() method and initialise your state there as in your point (2) and as
described in [1].

If you want to re-scale your job, you have to take a savepoint and may resume
from there with a different parallelism [2] but be sure to set a maximum
parallelism (per job / or operator) and set UUIDs for operators as described
in [3].


Nico

[1] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ state.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html
[2] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/ savepoints.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/
savepoints.html
[3] <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/ production_ready.html" rel="noreferrer" target="_blank">https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/
production_ready.html

On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote:
> Hi,
>
> can someone elaborate on when I should set properties transient /
> non-transient within operators (e.g. map / flatMap / reduce) ?
>
> I see these two possibilies:
>
> (1) initialize a non-transient property from the constructor
> (2) initialize a transient property inside a Rich???Function when
> open(ConfigurationParameters) is invoked
>
> on what criteria should I choose (1) or (2) ?
>
> how is this related to checkpointing / rebalancing?
>
> Thanks in advance
> Peter


Reply | Threaded
Open this post in threaded view
|

Re: state inside functions

vishalovercome
When running in HA mode or taking savepoints, if we pass configuration as
constructor arguments, then it seems as though changing configuration at a
later time doesn't work as it uses state to restore older configuration. How
can we pass configuration while having the flexibility to change the values
at a later date?

I've started another discussion with many more questions -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: state inside functions

Timo Walther
Hi,

if you would like to dynamically adjust configuration of your streaming
job, it might be a good approach to consider the configuration as a
stream itself.

The connect() API can be used to connect a main stream with a control
stream. In any case the configuration should be persisted in state if it
should be present after restore. Otherwise, you need to implement a
logic where the operator must query the latest configuration from some
external system which could become the bottleneck.

Regards,
Timo


On 16.12.20 22:07, vishalovercome wrote:

> When running in HA mode or taking savepoints, if we pass configuration as
> constructor arguments, then it seems as though changing configuration at a
> later time doesn't work as it uses state to restore older configuration. How
> can we pass configuration while having the flexibility to change the values
> at a later date?
>
> I've started another discussion with many more questions -
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>