Flink HA with Kubernetes, without Zookeeper

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink HA with Kubernetes, without Zookeeper

Hao Sun
Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper? I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Flink HA with Kubernetes, without Zookeeper

Shannon Carey
Zookeeper should still be necessary even in that case, because it is where the JobManager stores information which needs to be recovered after the JobManager fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely on a separate Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun <[hidden email]>
Date: Sunday, August 20, 2017 at 9:04 PM
To: "[hidden email]" <[hidden email]>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper? I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Flink HA with Kubernetes, without Zookeeper

Stephan Ewen
Hi!

That is a very interesting proposition. In cases where you have a single master only, you may bet away with quite good guarantees without ZK. In fact, Flink does not store significant data in ZK at all, it only uses locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname)

  - A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to integrate it would probably be to add an implementation of Flink's "HighAvailabilityServices" based on etcd.


If you want to contribute an extension of Flink using etcd, that would be awesome.
This should have a FLIP though, and a plan on how to set up rigorous unit testing of that implementation (because its correctness is very crucial to Flink's HA resilience). 

Best,
Stephan


On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <[hidden email]> wrote:
Zookeeper should still be necessary even in that case, because it is where the JobManager stores information which needs to be recovered after the JobManager fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely on a separate Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun <[hidden email]>
Date: Sunday, August 20, 2017 at 9:04 PM
To: "[hidden email]" <[hidden email]>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper? I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Flink HA with Kubernetes, without Zookeeper

Hao Sun
Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check that out and share my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask what exactly is stored into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen <[hidden email]> wrote:
Hi!

That is a very interesting proposition. In cases where you have a single master only, you may bet away with quite good guarantees without ZK. In fact, Flink does not store significant data in ZK at all, it only uses locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname)

  - A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to integrate it would probably be to add an implementation of Flink's "HighAvailabilityServices" based on etcd.


If you want to contribute an extension of Flink using etcd, that would be awesome.
This should have a FLIP though, and a plan on how to set up rigorous unit testing of that implementation (because its correctness is very crucial to Flink's HA resilience). 

Best,
Stephan


On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <[hidden email]> wrote:
Zookeeper should still be necessary even in that case, because it is where the JobManager stores information which needs to be recovered after the JobManager fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely on a separate Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun <[hidden email]>
Date: Sunday, August 20, 2017 at 9:04 PM
To: "[hidden email]" <[hidden email]>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper? I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Flink HA with Kubernetes, without Zookeeper

James Bucher
Just wanted to throw in a couple more details here from what I have learned from working with Kubernetes.

All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes
  • This works very well, we run multiple jobs with a single Jobmanager and Flink/Kubernetes recovers quite well.
A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname):
  • We use StatefulSets which provide a DNS based discovery mechanism. Provided DNS is set up correctly with TTLs this works well. You could also leverage the built-in Kubernetes services if you are only running a single Job Manager. Kubernetes will just route the traffic to the single pod. This works fine with a single Job Manager (I have tested it). However multiple Job Managers won’t work because Kubernetes will route this round-robin to the Job Managers
A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership:
A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.
  • This is very similar to the above, we should be able to accomplish that through the PATCH API combined with update if condition.
If you don’t want to actually rip way into the code for the Job Manager the ETCD Operator would be a good way to bring up an ETCD cluster that is separate from the core Kubernetes ETCD database. Combined with zetcd you could probably have that up and running quickly.

Thanks,
James Bucher

From: Hao Sun <[hidden email]>
Date: Monday, August 21, 2017 at 9:45 AM
To: Stephan Ewen <[hidden email]>, Shannon Carey <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink HA with Kubernetes, without Zookeeper

Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check that out and share my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask what exactly is stored into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen <[hidden email]> wrote:
Hi!

That is a very interesting proposition. In cases where you have a single master only, you may bet away with quite good guarantees without ZK. In fact, Flink does not store significant data in ZK at all, it only uses locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname)

  - A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to integrate it would probably be to add an implementation of Flink's "HighAvailabilityServices" based on etcd.


If you want to contribute an extension of Flink using etcd, that would be awesome.
This should have a FLIP though, and a plan on how to set up rigorous unit testing of that implementation (because its correctness is very crucial to Flink's HA resilience). 

Best,
Stephan


On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <[hidden email]> wrote:
Zookeeper should still be necessary even in that case, because it is where the JobManager stores information which needs to be recovered after the JobManager fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely on a separate Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun <[hidden email]>
Date: Sunday, August 20, 2017 at 9:04 PM
To: "[hidden email]" <[hidden email]>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper? I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Flink HA with Kubernetes, without Zookeeper

Hao Sun

Great suggestions, the etcd operator is very interesting, thanks James.


On Tue, Aug 22, 2017, 12:42 James Bucher <[hidden email]> wrote:
Just wanted to throw in a couple more details here from what I have learned from working with Kubernetes.

All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes
  • This works very well, we run multiple jobs with a single Jobmanager and Flink/Kubernetes recovers quite well.
A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname):
  • We use StatefulSets which provide a DNS based discovery mechanism. Provided DNS is set up correctly with TTLs this works well. You could also leverage the built-in Kubernetes services if you are only running a single Job Manager. Kubernetes will just route the traffic to the single pod. This works fine with a single Job Manager (I have tested it). However multiple Job Managers won’t work because Kubernetes will route this round-robin to the Job Managers
A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership:
A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.
  • This is very similar to the above, we should be able to accomplish that through the PATCH API combined with update if condition.
If you don’t want to actually rip way into the code for the Job Manager the ETCD Operator would be a good way to bring up an ETCD cluster that is separate from the core Kubernetes ETCD database. Combined with zetcd you could probably have that up and running quickly.

Thanks,
James Bucher

From: Hao Sun <[hidden email]>
Date: Monday, August 21, 2017 at 9:45 AM
To: Stephan Ewen <[hidden email]>, Shannon Carey <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink HA with Kubernetes, without Zookeeper

Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check that out and share my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask what exactly is stored into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen <[hidden email]> wrote:
Hi!

That is a very interesting proposition. In cases where you have a single master only, you may bet away with quite good guarantees without ZK. In fact, Flink does not store significant data in ZK at all, it only uses locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname)

  - A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to integrate it would probably be to add an implementation of Flink's "HighAvailabilityServices" based on etcd.


If you want to contribute an extension of Flink using etcd, that would be awesome.
This should have a FLIP though, and a plan on how to set up rigorous unit testing of that implementation (because its correctness is very crucial to Flink's HA resilience). 

Best,
Stephan


On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <[hidden email]> wrote:
Zookeeper should still be necessary even in that case, because it is where the JobManager stores information which needs to be recovered after the JobManager fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely on a separate Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun <[hidden email]>
Date: Sunday, August 20, 2017 at 9:04 PM
To: "[hidden email]" <[hidden email]>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper? I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Flink HA with Kubernetes, without Zookeeper

Ufuk Celebi
Thanks James for sharing your experience. I find it very interesting :-)


On Tue, Aug 22, 2017 at 9:50 PM, Hao Sun <[hidden email]> wrote:

> Great suggestions, the etcd operator is very interesting, thanks James.
>
>
> On Tue, Aug 22, 2017, 12:42 James Bucher <[hidden email]> wrote:
>>
>> Just wanted to throw in a couple more details here from what I have
>> learned from working with Kubernetes.
>>
>> All processes restart (a lost JobManager restarts eventually). Should be
>> given in Kubernetes:
>>
>> This works very well, we run multiple jobs with a single Jobmanager and
>> Flink/Kubernetes recovers quite well.
>>
>> A way for TaskManagers to discover the restarted JobManager. Should work
>> via Kubernetes as well (restarted containers retain the external hostname):
>>
>> We use StatefulSets which provide a DNS based discovery mechanism.
>> Provided DNS is set up correctly with TTLs this works well. You could also
>> leverage the built-in Kubernetes services if you are only running a single
>> Job Manager. Kubernetes will just route the traffic to the single pod. This
>> works fine with a single Job Manager (I have tested it). However multiple
>> Job Managers won’t work because Kubernetes will route this round-robin to
>> the Job Managers
>>
>> A way to isolate different "leader sessions" against each other. Flink
>> currently uses ZooKeeper to also attach a "leader session ID" to leader
>> election, which is a fencing token to avoid that processes talk to each
>> other despite having different views on who is the leader, or whether the
>> leaser lost and re-gained leadership:
>>
>> This is probably the most difficult thing. You could leverage the built in
>> ETCD cluster. Connecting directly to the Kubernetes ETCD database directly
>> is probably a bad idea however. You should be able to create a counter using
>> the PATCH API that Kubernetes supplies in the API which follows:
>> https://tools.ietf.org/html/rfc6902 you could probably leverage
>> https://tools.ietf.org/html/rfc6902#section-4.6 to allow for atomic updates
>> to counters. Combining this with:
>> https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
>> should give a good way to work with ETCD without actually connecting
>> directly to the Kubernetes ETCD directly. This integration would require
>> modifying the Job Manager leader election code.
>>
>> A distributed atomic counter for the checkpoint ID. This is crucial to
>> ensure correctness of checkpoints in the presence of JobManager failures and
>> re-elections or split-brain situations.
>>
>> This is very similar to the above, we should be able to accomplish that
>> through the PATCH API combined with update if condition.
>>
>> If you don’t want to actually rip way into the code for the Job Manager
>> the ETCD Operator would be a good way to bring up an ETCD cluster that is
>> separate from the core Kubernetes ETCD database. Combined with zetcd you
>> could probably have that up and running quickly.
>>
>> Thanks,
>> James Bucher
>>
>> From: Hao Sun <[hidden email]>
>> Date: Monday, August 21, 2017 at 9:45 AM
>> To: Stephan Ewen <[hidden email]>, Shannon Carey <[hidden email]>
>> Cc: "[hidden email]" <[hidden email]>
>> Subject: Re: Flink HA with Kubernetes, without Zookeeper
>>
>> Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check
>> that out and share my results if we proceed on that path.
>> Thanks Stephan for the details, this is very useful, I was about to ask
>> what exactly is stored into zookeeper, haha.
>>
>> On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen <[hidden email]> wrote:
>>>
>>> Hi!
>>>
>>> That is a very interesting proposition. In cases where you have a single
>>> master only, you may bet away with quite good guarantees without ZK. In
>>> fact, Flink does not store significant data in ZK at all, it only uses locks
>>> and counters.
>>>
>>> You can have a setup without ZK, provided you have the following:
>>>
>>>   - All processes restart (a lost JobManager restarts eventually). Should
>>> be given in Kubernetes.
>>>
>>>   - A way for TaskManagers to discover the restarted JobManager. Should
>>> work via Kubernetes as well (restarted containers retain the external
>>> hostname)
>>>
>>>   - A way to isolate different "leader sessions" against each other.
>>> Flink currently uses ZooKeeper to also attach a "leader session ID" to
>>> leader election, which is a fencing token to avoid that processes talk to
>>> each other despite having different views on who is the leader, or whether
>>> the leaser lost and re-gained leadership.
>>>
>>>   - An atomic marker for what is the latest completed checkpoint.
>>>
>>>   - A distributed atomic counter for the checkpoint ID. This is crucial
>>> to ensure correctness of checkpoints in the presence of JobManager failures
>>> and re-elections or split-brain situations.
>>>
>>> I would assume that etcd can provide all of those services. The best way
>>> to integrate it would probably be to add an implementation of Flink's
>>> "HighAvailabilityServices" based on etcd.
>>>
>>> Have a look at this class:
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
>>>
>>> If you want to contribute an extension of Flink using etcd, that would be
>>> awesome.
>>> This should have a FLIP though, and a plan on how to set up rigorous unit
>>> testing of that implementation (because its correctness is very crucial to
>>> Flink's HA resilience).
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <[hidden email]>
>>> wrote:
>>>>
>>>> Zookeeper should still be necessary even in that case, because it is
>>>> where the JobManager stores information which needs to be recovered after
>>>> the JobManager fails.
>>>>
>>>> We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper
>>>> on top of Kubernetes' etcd cluster so that we don't have to rely on a
>>>> separate Zookeeper cluster. However, we haven't tried it yet.
>>>>
>>>> -Shannon
>>>>
>>>> From: Hao Sun <[hidden email]>
>>>> Date: Sunday, August 20, 2017 at 9:04 PM
>>>> To: "[hidden email]" <[hidden email]>
>>>> Subject: Flink HA with Kubernetes, without Zookeeper
>>>>
>>>> Hi, I am new to Flink and trying to bring up a Flink cluster on top of
>>>> Kubernetes.
>>>>
>>>> For HA setup, with kubernetes, I think I just need one job manager and
>>>> do not need Zookeeper? I will store all states to S3 buckets. So in case of
>>>> failure, kubernetes can just bring up a new job manager without losing
>>>> anything?
>>>>
>>>> I want to confirm my assumptions above make sense. Thanks
>>>
>>>
>