How to restore state from savepoint with flink SQL

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to restore state from savepoint with flink SQL

Yan Zhou [FDS Science] ­

Hi,


My application use flink SQL and it's running in production. How can i update my application with topology changes yet doesn't lose the state data?

Is there a way to assign UID to the operators that are translated from SQL? If not, is it intended and whats the rationality behind it? 


According to the flink document[1], UID has to be manually assigned to each operator.  Otherwise changing the topology will very likely change the UID, which in return messes up the state restoring from savepoint. 


Please advise.


[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids



Reply | Threaded
Open this post in threaded view
|

Re: How to restore state from savepoint with flink SQL

Fabian Hueske-2
Hi,

At the moment, you can only restore a query from a savepoint if the query is not modified and the same Flink version is used.
Since SQL queries are automatically translated into data flows, it is not transparent to the user, which operators will be created.
We would need to expose an intermediate state after optimization and before the translation into DataStream operators to be able to assign operator UIDs (or also fine-tune the parallelism of operators).

For now, we are conservative and don't support this to prevent invalid reuse of state.
If you think about changing the query and restarting from a previous savepoint you should be aware that (at least some parts of) the results won't have proper semantics.

Updating SQL queries or migrating a queries to a new Flink version is a very challenging topic for which the community still needs to come up with a solution, design, and eventually implementations.

Best,
Fabian



2018-05-23 0:42 GMT+02:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,


My application use flink SQL and it's running in production. How can i update my application with topology changes yet doesn't lose the state data?

Is there a way to assign UID to the operators that are translated from SQL? If not, is it intended and whats the rationality behind it? 


According to the flink document[1], UID has to be manually assigned to each operator.  Otherwise changing the topology will very likely change the UID, which in return messes up the state restoring from savepoint. 


Please advise.


[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
Assigning Operator IDs. It is highly recommended that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the uid(String) method.




Reply | Threaded
Open this post in threaded view
|

Re: How to restore state from savepoint with flink SQL

Yan Zhou [FDS Science] ­

Thanks for the reply. 


If I only change query upstream and downstream operators, can I restore the query's state from a savepoint? It seems like the translated operators for a query have a auto-generated uid/hash, whose value depends on its location in the graph and its input/output.


Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Wednesday, May 23, 2018 3:18:08 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: How to restore state from savepoint with flink SQL
 
Hi,

At the moment, you can only restore a query from a savepoint if the query is not modified and the same Flink version is used.
Since SQL queries are automatically translated into data flows, it is not transparent to the user, which operators will be created.
We would need to expose an intermediate state after optimization and before the translation into DataStream operators to be able to assign operator UIDs (or also fine-tune the parallelism of operators).

For now, we are conservative and don't support this to prevent invalid reuse of state.
If you think about changing the query and restarting from a previous savepoint you should be aware that (at least some parts of) the results won't have proper semantics.

Updating SQL queries or migrating a queries to a new Flink version is a very challenging topic for which the community still needs to come up with a solution, design, and eventually implementations.

Best,
Fabian



2018-05-23 0:42 GMT+02:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,


My application use flink SQL and it's running in production. How can i update my application with topology changes yet doesn't lose the state data?

Is there a way to assign UID to the operators that are translated from SQL? If not, is it intended and whats the rationality behind it? 


According to the flink document[1], UID has to be manually assigned to each operator.  Otherwise changing the topology will very likely change the UID, which in return messes up the state restoring from savepoint. 


Please advise.


[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
Assigning Operator IDs. It is highly recommended that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the uid(String) method.




Reply | Threaded
Open this post in threaded view
|

Re: How to restore state from savepoint with flink SQL

Fabian Hueske-2
Hi Yan,

You are right. That's an unnecessary limitation at the moment and should be fixed.
The UIDs should only depend on the query itself and not on any preceding or subsequent operators.

The JIRA ticket to track the issue is FLINK-6966.

Best, Fabian

2018-05-24 0:34 GMT+02:00 Yan Zhou [FDS Science] <[hidden email]>:

Thanks for the reply. 


If I only change query upstream and downstream operators, can I restore the query's state from a savepoint? It seems like the translated operators for a query have a auto-generated uid/hash, whose value depends on its location in the graph and its input/output.


Best

Yan


From: Fabian Hueske <[hidden email]>
Sent: Wednesday, May 23, 2018 3:18:08 AM
To: Yan Zhou [FDS Science]
Cc: [hidden email]
Subject: Re: How to restore state from savepoint with flink SQL
 
Hi,

At the moment, you can only restore a query from a savepoint if the query is not modified and the same Flink version is used.
Since SQL queries are automatically translated into data flows, it is not transparent to the user, which operators will be created.
We would need to expose an intermediate state after optimization and before the translation into DataStream operators to be able to assign operator UIDs (or also fine-tune the parallelism of operators).

For now, we are conservative and don't support this to prevent invalid reuse of state.
If you think about changing the query and restarting from a previous savepoint you should be aware that (at least some parts of) the results won't have proper semantics.

Updating SQL queries or migrating a queries to a new Flink version is a very challenging topic for which the community still needs to come up with a solution, design, and eventually implementations.

Best,
Fabian



2018-05-23 0:42 GMT+02:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,


My application use flink SQL and it's running in production. How can i update my application with topology changes yet doesn't lose the state data?

Is there a way to assign UID to the operators that are translated from SQL? If not, is it intended and whats the rationality behind it? 


According to the flink document[1], UID has to be manually assigned to each operator.  Otherwise changing the topology will very likely change the UID, which in return messes up the state restoring from savepoint. 


Please advise.


[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
Assigning Operator IDs. It is highly recommended that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the uid(String) method.