How to recover state from savepoint on embedded mode?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to recover state from savepoint on embedded mode?

Reo Lei
Hi,
I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?
BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

BR,
Reo

Reply | Threaded
Open this post in threaded view
|

Re: How to recover state from savepoint on embedded mode?

Yun Tang

What is the embedded mode mean here? If you refer to SQL embedded mode, you cannot resume from savepoint now; if you refer to local standalone cluster, you could use `bin/flink run -s` to resume on a local cluster.

 

Best

Yun Tang

 

From: Reo Lei <[hidden email]>
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "[hidden email]" <[hidden email]>
Subject: How to recover state from savepoint on embedded mode?

 

Hi,

I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

 

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?

BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

 

BR,

Reo

 

Reply | Threaded
Open this post in threaded view
|

Fwd: How to recover state from savepoint on embedded mode?

Reo Lei


---------- Forwarded message ---------
发件人: Reo Lei <[hidden email]>
Date: 2019年11月26日周二 上午9:53
Subject: Re: How to recover state from savepoint on embedded mode?
To: Yun Tang <[hidden email]>


Hi Yun,
Thanks for your reply. what I say the embedded mode is the whole flink cluster and job, include jobmanager, taskmanager and the job application itself, running within a local JVM progress, which is use the "LocalStreamEnvironment" within the job. And the start command look like this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar com.a.b.c.MyJob > /dev/null &"

why I am not use the standalnoe mode to run the job is because the running env haven't zookeeper, and would not install the zookeeper. So I need to depend on the embedded mode to run my job.

BR,
Reo

Yun Tang <[hidden email]> 于2019年11月26日周二 上午2:38写道:

What is the embedded mode mean here? If you refer to SQL embedded mode, you cannot resume from savepoint now; if you refer to local standalone cluster, you could use `bin/flink run -s` to resume on a local cluster.

 

Best

Yun Tang

 

From: Reo Lei <[hidden email]>
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "[hidden email]" <[hidden email]>
Subject: How to recover state from savepoint on embedded mode?

 

Hi,

I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

 

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?

BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

 

BR,

Reo

 

Reply | Threaded
Open this post in threaded view
|

Re: How to recover state from savepoint on embedded mode?

Congxian Qiu
Hi,

You can recovery from checkpoint/savepoint if JM&TM can read from the given path. no math which mode the job is running on.

Best,
Congxian


Reo Lei <[hidden email]> 于2019年11月26日周二 下午12:18写道:


---------- Forwarded message ---------
发件人: Reo Lei <[hidden email]>
Date: 2019年11月26日周二 上午9:53
Subject: Re: How to recover state from savepoint on embedded mode?
To: Yun Tang <[hidden email]>


Hi Yun,
Thanks for your reply. what I say the embedded mode is the whole flink cluster and job, include jobmanager, taskmanager and the job application itself, running within a local JVM progress, which is use the "LocalStreamEnvironment" within the job. And the start command look like this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar com.a.b.c.MyJob > /dev/null &"

why I am not use the standalnoe mode to run the job is because the running env haven't zookeeper, and would not install the zookeeper. So I need to depend on the embedded mode to run my job.

BR,
Reo

Yun Tang <[hidden email]> 于2019年11月26日周二 上午2:38写道:

What is the embedded mode mean here? If you refer to SQL embedded mode, you cannot resume from savepoint now; if you refer to local standalone cluster, you could use `bin/flink run -s` to resume on a local cluster.

 

Best

Yun Tang

 

From: Reo Lei <[hidden email]>
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "[hidden email]" <[hidden email]>
Subject: How to recover state from savepoint on embedded mode?

 

Hi,

I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

 

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?

BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

 

BR,

Reo

 

Reply | Threaded
Open this post in threaded view
|

Re: How to recover state from savepoint on embedded mode?

Arvid Heise-3
Just to add up, if you use LocalStreamEnvironment, you can pass a configuration and you can set "execution.savepoint.path" to point to your savepoint.

Best,

Arvid

On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu <[hidden email]> wrote:
Hi,

You can recovery from checkpoint/savepoint if JM&TM can read from the given path. no math which mode the job is running on.

Best,
Congxian


Reo Lei <[hidden email]> 于2019年11月26日周二 下午12:18写道:


---------- Forwarded message ---------
发件人: Reo Lei <[hidden email]>
Date: 2019年11月26日周二 上午9:53
Subject: Re: How to recover state from savepoint on embedded mode?
To: Yun Tang <[hidden email]>


Hi Yun,
Thanks for your reply. what I say the embedded mode is the whole flink cluster and job, include jobmanager, taskmanager and the job application itself, running within a local JVM progress, which is use the "LocalStreamEnvironment" within the job. And the start command look like this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar com.a.b.c.MyJob > /dev/null &"

why I am not use the standalnoe mode to run the job is because the running env haven't zookeeper, and would not install the zookeeper. So I need to depend on the embedded mode to run my job.

BR,
Reo

Yun Tang <[hidden email]> 于2019年11月26日周二 上午2:38写道:

What is the embedded mode mean here? If you refer to SQL embedded mode, you cannot resume from savepoint now; if you refer to local standalone cluster, you could use `bin/flink run -s` to resume on a local cluster.

 

Best

Yun Tang

 

From: Reo Lei <[hidden email]>
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "[hidden email]" <[hidden email]>
Subject: How to recover state from savepoint on embedded mode?

 

Hi,

I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

 

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?

BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

 

BR,

Reo

 

Reply | Threaded
Open this post in threaded view
|

Re: How to recover state from savepoint on embedded mode?

Dawid Wysakowicz-2

Hi,

I would like to clarify previous responses a bit.

1. From the architectural point of view yes it is true it is possible to restore from a savepoint from a local jvm as long as this jvm has access to the checkpoint.

2. Unfortunately the configuration you pass to the ctor of LocalStreamEnvironment is not passed to the StreamGraphGenerator which sets the savepoint configuration. That said, unless I am wrong this approach will not work.

3. There is no easy and officially supported way to do this. The official way would be to start a local cluster and submit your job remotely to that local cluster, which you can also debug remotely.

I know this is not perfect. A different workaround I can offer would be to modify/reuse the LocalExecutionEnvironment a bit.

You can

  1.  get a StreamGraph from a StreamExecutionEnvironment (via StreamExecutionEnvironment#getStreamGraph),
  2.  generate a JobGraph out of it,
  3.  set the savepoint settings
  4.  and submit it locally to a MiniCluster.

You can reuse majority of the code from the LocalStreamEnvironment#execute(StreamGraph) method. The thing you have to add is once you get the jobGraph:

jobGrap.setSavepointRestoreSettings(...)

I know this is not the nicest solution, but some of my colleagues are currently working on improving the job submission api. (Some of the FLIPs around the topic are: https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission and https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API).

Best,

Dawid

On 28/11/2019 19:56, Arvid Heise wrote:
Just to add up, if you use LocalStreamEnvironment, you can pass a configuration and you can set "execution.savepoint.path" to point to your savepoint.

Best,

Arvid

On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu <[hidden email]> wrote:
Hi,

You can recovery from checkpoint/savepoint if JM&TM can read from the given path. no math which mode the job is running on.

Best,
Congxian


Reo Lei <[hidden email]> 于2019年11月26日周二 下午12:18写道:


---------- Forwarded message ---------
发件人: Reo Lei <[hidden email]>
Date: 2019年11月26日周二 上午9:53
Subject: Re: How to recover state from savepoint on embedded mode?
To: Yun Tang <[hidden email]>


Hi Yun,
Thanks for your reply. what I say the embedded mode is the whole flink cluster and job, include jobmanager, taskmanager and the job application itself, running within a local JVM progress, which is use the "LocalStreamEnvironment" within the job. And the start command look like this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar com.a.b.c.MyJob > /dev/null &"

why I am not use the standalnoe mode to run the job is because the running env haven't zookeeper, and would not install the zookeeper. So I need to depend on the embedded mode to run my job.

BR,
Reo

Yun Tang <[hidden email]> 于2019年11月26日周二 上午2:38写道:

What is the embedded mode mean here? If you refer to SQL embedded mode, you cannot resume from savepoint now; if you refer to local standalone cluster, you could use `bin/flink run -s` to resume on a local cluster.

 

Best

Yun Tang

 

From: Reo Lei <[hidden email]>
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "[hidden email]" <[hidden email]>
Subject: How to recover state from savepoint on embedded mode?

 

Hi,

I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

 

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?

BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

 

BR,

Reo

 


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to recover state from savepoint on embedded mode?

Biao Liu
Hi Reo,

Maybe we could find another way.

> why I am not use the standalnoe mode to run the job is because the running env haven't zookeeper, and would not install the zookeeper. So I need to depend on the embedded mode to run my job.

You could set up a standalone cluster without zookeeper.
Do not set "high-availability" in flink-conf.yaml or set it to "NONE". And provide the "jobmanager.rpc.address" and "jobmanager.rpc.port" in flink-conf.yaml at the same time.
In this way, you could build a standalone cluster, see more details in [1].

Could it satisfy your requirement?

On Fri, 29 Nov 2019 at 18:45, Dawid Wysakowicz <[hidden email]> wrote:

Hi,

I would like to clarify previous responses a bit.

1. From the architectural point of view yes it is true it is possible to restore from a savepoint from a local jvm as long as this jvm has access to the checkpoint.

2. Unfortunately the configuration you pass to the ctor of LocalStreamEnvironment is not passed to the StreamGraphGenerator which sets the savepoint configuration. That said, unless I am wrong this approach will not work.

3. There is no easy and officially supported way to do this. The official way would be to start a local cluster and submit your job remotely to that local cluster, which you can also debug remotely.

I know this is not perfect. A different workaround I can offer would be to modify/reuse the LocalExecutionEnvironment a bit.

You can

  1.  get a StreamGraph from a StreamExecutionEnvironment (via StreamExecutionEnvironment#getStreamGraph),
  2.  generate a JobGraph out of it,
  3.  set the savepoint settings
  4.  and submit it locally to a MiniCluster.

You can reuse majority of the code from the LocalStreamEnvironment#execute(StreamGraph) method. The thing you have to add is once you get the jobGraph:

jobGrap.setSavepointRestoreSettings(...)

I know this is not the nicest solution, but some of my colleagues are currently working on improving the job submission api. (Some of the FLIPs around the topic are: https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission and https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API).

Best,

Dawid

On 28/11/2019 19:56, Arvid Heise wrote:
Just to add up, if you use LocalStreamEnvironment, you can pass a configuration and you can set "execution.savepoint.path" to point to your savepoint.

Best,

Arvid

On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu <[hidden email]> wrote:
Hi,

You can recovery from checkpoint/savepoint if JM&TM can read from the given path. no math which mode the job is running on.

Best,
Congxian


Reo Lei <[hidden email]> 于2019年11月26日周二 下午12:18写道:


---------- Forwarded message ---------
发件人: Reo Lei <[hidden email]>
Date: 2019年11月26日周二 上午9:53
Subject: Re: How to recover state from savepoint on embedded mode?
To: Yun Tang <[hidden email]>


Hi Yun,
Thanks for your reply. what I say the embedded mode is the whole flink cluster and job, include jobmanager, taskmanager and the job application itself, running within a local JVM progress, which is use the "LocalStreamEnvironment" within the job. And the start command look like this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar com.a.b.c.MyJob > /dev/null &"

why I am not use the standalnoe mode to run the job is because the running env haven't zookeeper, and would not install the zookeeper. So I need to depend on the embedded mode to run my job.

BR,
Reo

Yun Tang <[hidden email]> 于2019年11月26日周二 上午2:38写道:

What is the embedded mode mean here? If you refer to SQL embedded mode, you cannot resume from savepoint now; if you refer to local standalone cluster, you could use `bin/flink run -s` to resume on a local cluster.

 

Best

Yun Tang

 

From: Reo Lei <[hidden email]>
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "[hidden email]" <[hidden email]>
Subject: How to recover state from savepoint on embedded mode?

 

Hi,

I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

 

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?

BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

 

BR,

Reo