How to ensure that job is restored from savepoint when using Flink SQL

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to ensure that job is restored from savepoint when using Flink SQL

shadowell

Hello, everyone,
        I have some unclear points when using Flink SQL. I hope to get an answer or tell me where I can find the answer.
        When using the DataStream API, in order to ensure that the job can recover the state from savepoint after adjustment, it is necessary to specify the uid for the operator. However, when using Flink SQL, the uid of the operator is automatically generated. If the SQL logic changes (operator order changes), when the task is restored from savepoint, will it cause some of the operator states to be unable to be mapped back, resulting in state loss?

Thanks~
Jie Feng 
Reply | Threaded
Open this post in threaded view
|

Re: How to ensure that job is restored from savepoint when using Flink SQL

Fabian Hueske-2
Hi Jie Feng,

As you said, Flink translates SQL queries into streaming programs with auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the same Flink version (optimizer changes might change the structure of the resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and planning to improve in the future.

I'd also like to add that it can be very difficult to assess whether it is meaningful to start a query from a savepoint that was generated with a different query.
A savepoint holds intermediate data that is needed to compute the result of a query.
If you update a query it is very well possible that the result computed by Flink won't be equal to the actual result of the new query.

Best, Fabian

Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell <[hidden email]>:

Hello, everyone,
        I have some unclear points when using Flink SQL. I hope to get an answer or tell me where I can find the answer.
        When using the DataStream API, in order to ensure that the job can recover the state from savepoint after adjustment, it is necessary to specify the uid for the operator. However, when using Flink SQL, the uid of the operator is automatically generated. If the SQL logic changes (operator order changes), when the task is restored from savepoint, will it cause some of the operator states to be unable to be mapped back, resulting in state loss?

Thanks~
Jie Feng 
Reply | Threaded
Open this post in threaded view
|

Re: How to ensure that job is restored from savepoint when using Flink SQL

shadowell

Hi Fabian,

Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.

I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql:   select id, name, sum(salary) from user_info where id == '001' and age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name; 
I just add some age limitation in new SQL. Now, I want to switch the job from old one to the new one by trigger a savepoint. Flink will generate operator IDs for operators in new SQL.
In this case, just from a technical point of view,  the operator IDs in the savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs for operators. The new IDs may not match the old IDs. 
This will cause some states failed to be mapped back from the old job savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.

Thanks~ 
Jie

On 7/7/2020 17:23[hidden email] wrote:
Hi Jie Feng,

As you said, Flink translates SQL queries into streaming programs with auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the same Flink version (optimizer changes might change the structure of the resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and planning to improve in the future.

I'd also like to add that it can be very difficult to assess whether it is meaningful to start a query from a savepoint that was generated with a different query.
A savepoint holds intermediate data that is needed to compute the result of a query.
If you update a query it is very well possible that the result computed by Flink won't be equal to the actual result of the new query.

Best, Fabian

Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell <[hidden email]>:

Hello, everyone,
        I have some unclear points when using Flink SQL. I hope to get an answer or tell me where I can find the answer.
        When using the DataStream API, in order to ensure that the job can recover the state from savepoint after adjustment, it is necessary to specify the uid for the operator. However, when using Flink SQL, the uid of the operator is automatically generated. If the SQL logic changes (operator order changes), when the task is restored from savepoint, will it cause some of the operator states to be unable to be mapped back, resulting in state loss?

Thanks~
Jie Feng 
Reply | Threaded
Open this post in threaded view
|

Re: How to ensure that job is restored from savepoint when using Flink SQL

Fabian Hueske-2
Hi Jie,

The auto-ID generation is not done by the SQL translation component but on a lower level, i.e., it's independent of Flink's SQL translation.
The ID generation only depends on the topology / graph structure of the program's operators.
The ID of an operator depends on the IDs of its predecessors (and not on its own processing logic or operator name).

So, as long as the operator graph structure of a program remains the same, it will be compatible with an earlier savepoint.
However, preserving the operator graph structure is out of the user's control.
The operator graph is automatically generated by the SQL optimizer and slight changes of a query can result in a different graph while other changes do not affect the structure.

In your example, the graph structure should remain the same because there is already a Filter operator (due to "where id == '001'") in the first query and the second query just extends the filter predicate ("id == '001' and age >= '28'").
If there was no WHERE clause in the first query, the plan might have been changed.
In order to reason about which query changes are savepoint compatible, you need in-depth knowledge about the optimizer's translation process.

I would not rely on being able to start a query from a savepoint of a (slightly) modified query.
First because it is very fragile given the query translation process and second because it results in incorrect results.

Given your example query, I would start it from scratch and add a predicate to continue after the latest result of the previous query:

select id, name, sum(salary) from user_info where id == '001' and age >= '28' and rowtime >= 'xxx' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;

If the last result of the first query was for '2020-07-07' I would set xxx to '2020-07-08-00:00:00.000'.
Of course this only works for queries with hard temporary boundaries, but it gives correct results.

Best, Fabian

Am Mi., 8. Juli 2020 um 04:50 Uhr schrieb shadowell <[hidden email]>:

Hi Fabian,

Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.

I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql:   select id, name, sum(salary) from user_info where id == '001' and age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name; 
I just add some age limitation in new SQL. Now, I want to switch the job from old one to the new one by trigger a savepoint. Flink will generate operator IDs for operators in new SQL.
In this case, just from a technical point of view,  the operator IDs in the savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs for operators. The new IDs may not match the old IDs. 
This will cause some states failed to be mapped back from the old job savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.

Thanks~ 
Jie

On 7/7/2020 17:23[hidden email] wrote:
Hi Jie Feng,

As you said, Flink translates SQL queries into streaming programs with auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the same Flink version (optimizer changes might change the structure of the resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and planning to improve in the future.

I'd also like to add that it can be very difficult to assess whether it is meaningful to start a query from a savepoint that was generated with a different query.
A savepoint holds intermediate data that is needed to compute the result of a query.
If you update a query it is very well possible that the result computed by Flink won't be equal to the actual result of the new query.

Best, Fabian

Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell <[hidden email]>:

Hello, everyone,
        I have some unclear points when using Flink SQL. I hope to get an answer or tell me where I can find the answer.
        When using the DataStream API, in order to ensure that the job can recover the state from savepoint after adjustment, it is necessary to specify the uid for the operator. However, when using Flink SQL, the uid of the operator is automatically generated. If the SQL logic changes (operator order changes), when the task is restored from savepoint, will it cause some of the operator states to be unable to be mapped back, resulting in state loss?

Thanks~
Jie Feng 
Reply | Threaded
Open this post in threaded view
|

Re: How to ensure that job is restored from savepoint when using Flink SQL

shadowell

Hi Fabian,


    Thanks for your reply, it helps a lot.


Best Regards,

Jie


On 7/8/2020 18:17[hidden email] wrote:
Hi Jie,

The auto-ID generation is not done by the SQL translation component but on a lower level, i.e., it's independent of Flink's SQL translation.
The ID generation only depends on the topology / graph structure of the program's operators.
The ID of an operator depends on the IDs of its predecessors (and not on its own processing logic or operator name).

So, as long as the operator graph structure of a program remains the same, it will be compatible with an earlier savepoint.
However, preserving the operator graph structure is out of the user's control.
The operator graph is automatically generated by the SQL optimizer and slight changes of a query can result in a different graph while other changes do not affect the structure.

In your example, the graph structure should remain the same because there is already a Filter operator (due to "where id == '001'") in the first query and the second query just extends the filter predicate ("id == '001' and age >= '28'").
If there was no WHERE clause in the first query, the plan might have been changed.
In order to reason about which query changes are savepoint compatible, you need in-depth knowledge about the optimizer's translation process.

I would not rely on being able to start a query from a savepoint of a (slightly) modified query.
First because it is very fragile given the query translation process and second because it results in incorrect results.

Given your example query, I would start it from scratch and add a predicate to continue after the latest result of the previous query:

select id, name, sum(salary) from user_info where id == '001' and age >= '28' and rowtime >= 'xxx' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;

If the last result of the first query was for '2020-07-07' I would set xxx to '2020-07-08-00:00:00.000'.
Of course this only works for queries with hard temporary boundaries, but it gives correct results.

Best, Fabian

Am Mi., 8. Juli 2020 um 04:50 Uhr schrieb shadowell <[hidden email]>:

Hi Fabian,

Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.

I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql:   select id, name, sum(salary) from user_info where id == '001' and age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name; 
I just add some age limitation in new SQL. Now, I want to switch the job from old one to the new one by trigger a savepoint. Flink will generate operator IDs for operators in new SQL.
In this case, just from a technical point of view,  the operator IDs in the savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs for operators. The new IDs may not match the old IDs. 
This will cause some states failed to be mapped back from the old job savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.

Thanks~ 
Jie

On 7/7/2020 17:23[hidden email] wrote:
Hi Jie Feng,

As you said, Flink translates SQL queries into streaming programs with auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the same Flink version (optimizer changes might change the structure of the resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and planning to improve in the future.

I'd also like to add that it can be very difficult to assess whether it is meaningful to start a query from a savepoint that was generated with a different query.
A savepoint holds intermediate data that is needed to compute the result of a query.
If you update a query it is very well possible that the result computed by Flink won't be equal to the actual result of the new query.

Best, Fabian

Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell <[hidden email]>:

Hello, everyone,
        I have some unclear points when using Flink SQL. I hope to get an answer or tell me where I can find the answer.
        When using the DataStream API, in order to ensure that the job can recover the state from savepoint after adjustment, it is necessary to specify the uid for the operator. However, when using Flink SQL, the uid of the operator is automatically generated. If the SQL logic changes (operator order changes), when the task is restored from savepoint, will it cause some of the operator states to be unable to be mapped back, resulting in state loss?

Thanks~
Jie Feng