Yep I know that option. That's where get me confused as well. In a HA setup, where do I supply this option (allowNonRestoredState)? This option requires a savepoint path when I start a flink job I remember. And HA does not require the path Hao Sun On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <[hidden email]> wrote:
|
Hi Hao
It seems that I misunderstood the background of usage for your cases. High availability configuration targets for fault tolerance not for general development evolution. If you want to change your job topology, just follow the general rule to restore from savepoint/checkpoint,
do not rely on HA to do job migration things.
Best
Yun Tang
From: Hao Sun <[hidden email]>
Sent: Friday, October 11, 2019 8:33 To: Yun Tang <[hidden email]> Cc: Vijay Bhaskar <[hidden email]>; Yang Wang <[hidden email]>; Sean Hester <[hidden email]>; Aleksandar Mastilovic <[hidden email]>; Yuval Itzchakov <[hidden email]>; user <[hidden email]> Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes Yep I know that option. That's where get me confused as well. In a HA setup, where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember. And HA does not require the path Hao Sun
On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <[hidden email]> wrote:
|
We are seeing below logs in production sometime ago, after that we stopped HA. Do you people think HA is enabled properly from the below logs? Regards Bhaskar 2019-09-24 17:40:17,675 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}. 2019-09-24 17:40:17,675 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock. 2019-09-24 17:40:20,975 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host 2019-09-24 17:40:20,976 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp:[hidden email]:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp:[hidden email]:6123]] Caused by: [No route to host] 2019-09-24 17:40:23,976 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host 2019-09-24 17:40:23,977 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp:[hidden email]:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp:[hidden email]:6123]] Caused by: [No route to host] 2019-09-24 17:40:26,982 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host 2019-09-24 17:40:26,983 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp:[hidden email]:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp:[hidden email]:6123]] Caused by: [No route to host] 2019-09-24 17:40:29,988 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host 2019-09-24 17:40:29,988 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp:[hidden email]:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp:[hidden email]:6123]] Caused by: [No route to host] 2019-09-24 17:40:32,994 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host 2019-09-24 17:40:32,995 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp:[hidden email]:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp:[hidden email]:6123]] Caused by: [No route to host] 2019-09-24 17:40:36,000 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host 2019-09-24 17:40:36,001 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp:[hidden email]:6123] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp:[hidden email]:6123]] Caused by: [No route to host] 2019-09-24 17:40:39,006 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host On Fri, Oct 11, 2019 at 9:39 AM Yun Tang <[hidden email]> wrote:
|
Apart from these we have other environment and there check point worked fine in HA mode with complete cluster restart. But one of the job we are seeing an issue, in zookeeper the check point path is retrieved and its unable to find the check point path in persistent storage. I am wondering why this would happen first of all? Is there any sync issue between file writing over persistent path and file registration with HA service? For example check point has been registered in zookeeper but has not been written yet while restarting the cluster? I suspect this kind of problem can happen. We are using flink 1.6.2 in production. Is this an issue already known before and fixed recently Regards Bhaskar On Fri, Oct 11, 2019 at 2:08 PM Vijay Bhaskar <[hidden email]> wrote:
|
Hi Vijay, Flink usually writes first the checkpoint data to disk and then writes the pointer to the files to ZooKeeper. Hence, if you see a ZooKeeper entry, then the files should be there. I assume that there is no other process accessing and potentially removing files from the checkpoint directories, right? Have you tried to run one of the latest Flink versions? Flink 1.6.2 is no longer actively supported by the community. Cheers, Till On Fri, Oct 11, 2019 at 11:39 AM Vijay Bhaskar <[hidden email]> wrote:
|
Thanks you till. We will try to shift to latest flink version. Regards Bhaskar On Mon, Oct 14, 2019 at 7:43 PM Till Rohrmann <[hidden email]> wrote:
|
In reply to this post by Hao Sun
I'm not the original poster, but I'm running into this same issue. What you just described is exactly what I want. I presume you guys are using some variant of this helm https://github.com/docker-flink/examples/tree/master/helm/flink to configure your k8s cluster? I'm also assuming that this cluster is running as a job cluster and not a session cluster right?
If so, how did you guys set up the deployments.yaml file such that it picks up the latest savepoint from a savepoint directory ( and what happens if that savepoint directory is empty? This is for cases when we're starting a new cluster, new job from scratch and there's no need to recover from previous savepoint ). On 2019/09/24 16:23:52, Hao Sun <[hidden email]> wrote: > We always make a savepoint before we shutdown the job-cluster. So the > savepoint is always the latest. When we fix a bug or change the job graph, > it can resume well. > We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, > uncaught exception, etc. > > Maybe I do not understand your use case well, I do not see a need to start > from checkpoint after a bug fix. > From what I know, currently you can use checkpoint as a savepoint as well > > Hao Sun > > > On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <[hidden email]> wrote: > > > AFAIK there's currently nothing implemented to solve this problem, but > > working on a possible fix can be implemented on top of > > https://github.com/lyft/flinkk8soperator > > <https://github.com/lyft/flinkk8soperator> which already > > has a pretty fancy state machine for rolling upgrades. I'd love to be > > involved as this is an issue I've been thinking about as well. > > > > Yuval > > > > On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <[hidden email]> > > wrote: > > > >> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases > >> when deploying Flink jobs to start from savepoints using the job-cluster > >> mode in Kubernetes. > >> > >> we're running a ~15 different jobs, all in job-cluster mode, using a mix > >> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are > >> all long-running streaming jobs, all essentially acting as microservices. > >> we're using Helm charts to configure all of our deployments. > >> > >> we have a number of use cases where we want to restart jobs from a > >> savepoint to replay recent events, i.e. when we've enhanced the job logic > >> or fixed a bug. but after the deployment we want to have the job resume > >> it's "long-running" behavior, where any unplanned restarts resume from the > >> latest checkpoint. > >> > >> the issue we run into is that any obvious/standard/idiomatic Kubernetes > >> deployment includes the savepoint argument in the configuration. if the Job > >> Manager container(s) have an unplanned restart, when they come back up they > >> will start from the savepoint instead of resuming from the latest > >> checkpoint. everything is working as configured, but that's not exactly > >> what we want. we want the savepoint argument to be transient somehow (only > >> used during the initial deployment), but Kubernetes doesn't really support > >> the concept of transient configuration. > >> > >> i can see a couple of potential solutions that either involve custom code > >> in the jobs or custom logic in the container (i.e. a custom entrypoint > >> script that records that the configured savepoint has already been used in > >> a file on a persistent volume or GCS, and potentially when/why/by which > >> deployment). but these seem like unexpected and hacky solutions. before we > >> head down that road i wanted to ask: > >> > >> - is this is already a solved problem that i've missed? > >> - is this issue already on the community's radar? > >> > >> thanks in advance! > >> > >> -- > >> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865 > >> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 > >> <http://www.bettercloud.com> > >> <http://www.bettercloud.com> > >> *Altitude 2019 in San Francisco | Sept. 23 - 25* > >> It’s not just an IT conference, it’s “a complete learning and networking > >> experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude> > >> > >> > > > > -- > > Best Regards, > > Yuval Itzchakov. > > > |
In reply to this post by Yun Tang
I'm not sure if this addresses the original concern. For instance consider
this sequence: 1. Job starts from savepoint 2. Job creates a few checkpoints 3. Job manager (just one in kubernetes) crashes and restarts with the commands specified in the kubernetes manifest which has the savepoint path Will Zookeeper based HA ensure that this savepoint path will be ignored? I've asked this and various other questions here - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Will-job-manager-restarts-lead-to-repeated-savepoint-restoration-tp40188.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Flink should try to pick the latest checkpoint and will only use the savepoint if no newer checkpoint could be found. Cheers, Till On Wed, Dec 16, 2020 at 10:13 PM vishalovercome <[hidden email]> wrote: I'm not sure if this addresses the original concern. For instance consider |
Thanks for your reply!
What I have seen is that the job terminates when there's intermittent loss of connectivity with zookeeper. This is in-fact the most common reason why our jobs are terminating at this point. Worse, it's unable to restore from checkpoint during some (not all) of these terminations. Under these scenarios, won't the job try to recover from a savepoint? I've gone through various tickets reporting stability issues due to zookeeper that you've mentioned you intend to resolve soon. But until the zookeeper based HA is stable, should we assume that it will repeatedly restore from savepoints? I would rather rely on kafka offsets to resume where it left off rather than savepoints. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |