Challenges Deploying Flink With Savepoints On Kubernetes

classic Classic list List threaded Threaded
30 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Challenges Deploying Flink With Savepoints On Kubernetes

Sean Hester
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yuval Itzchakov
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Hao Sun
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yuval Itzchakov
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Hao Sun
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Sean Hester
thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.


--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Vijay Bhaskar
One of the way you should do is, have a separate cluster job manager program in kubernetes, which is actually managing jobs. So that you can decouple the job control. While restarting the job, make sure to follow the below steps:

a) First job manager takes save point by killing the job and notes down the save point path by using the save point rest api
b) After  that job manager starts the new job by supplying the save point path. So that it starts from the latest save point. 

So that you no need to rely on yaml configuration.

Also above steps helps only for manual restart of the flink job. 
There are another 2 cases possible:

case 1 => Your job restarts by it self with the help of flink cluster, then latest check point is going to take care of the job state, no need to worry about
case 2 => Your job is failed. Then state is lost. To overcome this, as per the documentation best thing is: Take periodic save points. So that while restarting the job from crashes,
provide the argument of latest save point path  as argument to your job manager program.

So the key is, have a seprate job manager of flink jobs so that you will have the flexibility

Regards
Bhaskar


On Wed, Sep 25, 2019 at 6:38 PM Sean Hester <[hidden email]> wrote:
thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.


--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Aleksandar Mastilovic
In reply to this post by Sean Hester
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.


--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 


Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yun Tang
As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Best Regards,
Yuval Itzchakov.


--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 


Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Aleksandar Mastilovic
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yang Wang
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Sean Hester
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Vijay Bhaskar
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Vijay Bhaskar
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <[hidden email]> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Sean Hester
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was. 

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <[hidden email]> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <[hidden email]> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yang Wang
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your configuration.When the 
jobmanager/taskmanager fails or the whole cluster crashes, it could always recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <[hidden email]> 于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was. 

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <[hidden email]> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <[hidden email]> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Vijay Bhaskar
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
     a) In that case, at least one job manager out of HA group should be up and running right? or
     b) All the job managers fails, then also this works? In that case please let me know the procedure/share the documentation? 
         How to start from previous check point?
         What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <[hidden email]> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your configuration.When the 
jobmanager/taskmanager fails or the whole cluster crashes, it could always recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <[hidden email]> 于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was. 

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <[hidden email]> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <[hidden email]> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yang Wang
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother jobmanagers 
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the latest checkpoint if the cluster-id
is not changed. 

When we enable the HA, The meta of jobgraph and checkpoint is saved on zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar <[hidden email]> 于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
     a) In that case, at least one job manager out of HA group should be up and running right? or
     b) All the job managers fails, then also this works? In that case please let me know the procedure/share the documentation? 
         How to start from previous check point?
         What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <[hidden email]> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your configuration.When the 
jobmanager/taskmanager fails or the whole cluster crashes, it could always recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <[hidden email]> 于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was. 

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <[hidden email]> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <[hidden email]> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Vijay Bhaskar
Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <[hidden email]> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother jobmanagers 
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the latest checkpoint if the cluster-id
is not changed. 

When we enable the HA, The meta of jobgraph and checkpoint is saved on zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar <[hidden email]> 于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
     a) In that case, at least one job manager out of HA group should be up and running right? or
     b) All the job managers fails, then also this works? In that case please let me know the procedure/share the documentation? 
         How to start from previous check point?
         What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <[hidden email]> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your configuration.When the 
jobmanager/taskmanager fails or the whole cluster crashes, it could always recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <[hidden email]> 于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was. 

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <[hidden email]> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <[hidden email]> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

Reply | Threaded
Open this post in threaded view
|

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yun Tang
Just a minor supplement [hidden email], if you decided to drop a operator, don't forget to add --allowNonRestoredState (short: -n) option [1]



Best
Yun Tang


From: Vijay Bhaskar <[hidden email]>
Sent: Thursday, October 10, 2019 19:24
To: Yang Wang <[hidden email]>
Cc: Sean Hester <[hidden email]>; Aleksandar Mastilovic <[hidden email]>; Yun Tang <[hidden email]>; Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <[hidden email]> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother jobmanagers 
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the latest checkpoint if the cluster-id
is not changed. 

When we enable the HA, The meta of jobgraph and checkpoint is saved on zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar <[hidden email]> 于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
     a) In that case, at least one job manager out of HA group should be up and running right? or
     b) All the job managers fails, then also this works? In that case please let me know the procedure/share the documentation? 
         How to start from previous check point?
         What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <[hidden email]> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your configuration.When the 
jobmanager/taskmanager fails or the whole cluster crashes, it could always recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <[hidden email]> 于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was. 

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <[hidden email]> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <[hidden email]> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point? 

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <[hidden email]> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <[hidden email]> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover 
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the 
high-availability configuration. Make sure the cluster-id is not changed, i think the job 
could recover both at exceptionally crash and restart by expectation.

[hidden email], we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.


Best,
Yang

Aleksandar Mastilovic <[hidden email]> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <[hidden email]> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.


Best
Yun Tang

From: Aleksandar Mastilovic <[hidden email]>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <[hidden email]>
Cc: Hao Sun <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
 
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <[hidden email]> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:


the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <[hidden email]> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <[hidden email]> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <[hidden email]> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
  • is this is already a solved problem that i've missed?
  • is this issue already on the community's radar?
thanks in advance!

-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



-- 
Best Regards,
Yuval Itzchakov.


-- 
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 

12