[SURVEY] How do you use high-availability services in Flink?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[SURVEY] How do you use high-availability services in Flink?

tison
Hi guys,

We want to have an accurate idea of how users actually use 
high-availability services in Flink, especially how you customize
high-availability services by HighAvailabilityServicesFactory.

Basically there are standalone impl., zookeeper impl., embedded impl.
used in MiniCluster, YARN impl. not yet implemented, and a gate to
customized implementations.

Generally I think standalone impl. and zookeeper impl. are the most
widely used implementations. The embedded impl. is used without
awareness when users run a MiniCluster.

Besides that, it is helpful to know how you guys customize 
high-availability services using HighAvailabilityServicesFactory 
interface for the ongoing FLINK-10333[1] which would evolve 
high-availability services in Flink. As well as whether there is any
user take interest in the not yet implemented YARN impl..

Any user case should be helpful. I really appreciate your time and your
insight.

Reply | Threaded
Open this post in threaded view
|

Re: [SURVEY] How do you use high-availability services in Flink?

tison
In addition, FLINK-13750[1] also likely introduce breaking change
on high-availability services. So it is highly encouraged you who
might be affected by the change share your cases :-)



Zili Chen <[hidden email]> 于2019年8月21日周三 下午3:32写道:
Hi guys,

We want to have an accurate idea of how users actually use 
high-availability services in Flink, especially how you customize
high-availability services by HighAvailabilityServicesFactory.

Basically there are standalone impl., zookeeper impl., embedded impl.
used in MiniCluster, YARN impl. not yet implemented, and a gate to
customized implementations.

Generally I think standalone impl. and zookeeper impl. are the most
widely used implementations. The embedded impl. is used without
awareness when users run a MiniCluster.

Besides that, it is helpful to know how you guys customize 
high-availability services using HighAvailabilityServicesFactory 
interface for the ongoing FLINK-10333[1] which would evolve 
high-availability services in Flink. As well as whether there is any
user take interest in the not yet implemented YARN impl..

Any user case should be helpful. I really appreciate your time and your
insight.

Reply | Threaded
Open this post in threaded view
|

Re: [SURVEY] How do you use high-availability services in Flink?

Aleksandar Mastilovic
In reply to this post by tison
Hi all,

Since I’m currently working on an implementation of HighAvailabilityServicesFactory I thought it would be good to report here about my experience so far.

Our use case is cloud based, where we package Flink and our supplementary code into a docker image, then run those images through Kubernetes+Helm orchestration.

We don’t use Hadoop nor HDFS but rather Google Cloud Storage, and we don’t run ZooKeepers. Our Flink setup consists of one JobManager and multiple TaskManagers on-demand.

Due to the nature of cloud computing there’s a possibility our JobManager instance might go down, only to be automatically recreated through Kubernetes. Since we don’t run ZooKeeper
We needed a way to run a variant of High Availability cluster where we would keep JobManager information on our attached persistent k8s volume instead of ZooKeeper.

So far we have a setup that seems to be working on our local deployment, we haven’t yet tried it in the actual cloud.

As far as implementation goes, here’s what we did:

We used MapDB (mapdb.org) as our storage format, to persist lists of objects onto disk. We partially relied on StandaloneHaServices for our HaServices implementation. Otherwise we looked at the ZooKeeperHaServices and related classes for inspiration and guidance.

Here’s a list of new classes:

FileSystemCheckpointIDCounter implements CheckpointIDCounter
FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory
FileSystemCompletedCheckpointStore implements CompletedCheckpointStore
FileSystemHaServices extends StandaloneHaServices
FileSystemHaServicesFactory implements HighAvailabilityServicesFactory
FileSystemSubmittedJobGraphStore implements SubmittedJobGraphStore

Testing so far proved that bringing down a JobManager and bringing it back up does indeed restore all the running jobs. Job creation/destruction also works. 

Hope this helps!

Thanks,
Aleksandar Mastilovic

On Aug 21, 2019, at 12:32 AM, Zili Chen <[hidden email]> wrote:

Hi guys,

We want to have an accurate idea of how users actually use 
high-availability services in Flink, especially how you customize
high-availability services by HighAvailabilityServicesFactory.

Basically there are standalone impl., zookeeper impl., embedded impl.
used in MiniCluster, YARN impl. not yet implemented, and a gate to
customized implementations.

Generally I think standalone impl. and zookeeper impl. are the most
widely used implementations. The embedded impl. is used without
awareness when users run a MiniCluster.

Besides that, it is helpful to know how you guys customize 
high-availability services using HighAvailabilityServicesFactory 
interface for the ongoing FLINK-10333[1] which would evolve 
high-availability services in Flink. As well as whether there is any
user take interest in the not yet implemented YARN impl..

Any user case should be helpful. I really appreciate your time and your
insight.


Reply | Threaded
Open this post in threaded view
|

Re: [SURVEY] How do you use high-availability services in Flink?

tison
Thanks for your email Aleksandar! Sorry for reply late.

May I ask a question, do you config high-availability.storageDir in your case? 
That is, do you persist and retrieve job graph & checkpoint totally in MapDB
or, as ZooKeeper implementation does, persist them in an external filesystem
and just store a handle in MapDB?

Best,
tison.


Aleksandar Mastilovic <[hidden email]> 于2019年8月24日周六 上午7:04写道:
Hi all,

Since I’m currently working on an implementation of HighAvailabilityServicesFactory I thought it would be good to report here about my experience so far.

Our use case is cloud based, where we package Flink and our supplementary code into a docker image, then run those images through Kubernetes+Helm orchestration.

We don’t use Hadoop nor HDFS but rather Google Cloud Storage, and we don’t run ZooKeepers. Our Flink setup consists of one JobManager and multiple TaskManagers on-demand.

Due to the nature of cloud computing there’s a possibility our JobManager instance might go down, only to be automatically recreated through Kubernetes. Since we don’t run ZooKeeper
We needed a way to run a variant of High Availability cluster where we would keep JobManager information on our attached persistent k8s volume instead of ZooKeeper.

So far we have a setup that seems to be working on our local deployment, we haven’t yet tried it in the actual cloud.

As far as implementation goes, here’s what we did:

We used MapDB (mapdb.org) as our storage format, to persist lists of objects onto disk. We partially relied on StandaloneHaServices for our HaServices implementation. Otherwise we looked at the ZooKeeperHaServices and related classes for inspiration and guidance.

Here’s a list of new classes:

FileSystemCheckpointIDCounter implements CheckpointIDCounter
FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory
FileSystemCompletedCheckpointStore implements CompletedCheckpointStore
FileSystemHaServices extends StandaloneHaServices
FileSystemHaServicesFactory implements HighAvailabilityServicesFactory
FileSystemSubmittedJobGraphStore implements SubmittedJobGraphStore

Testing so far proved that bringing down a JobManager and bringing it back up does indeed restore all the running jobs. Job creation/destruction also works. 

Hope this helps!

Thanks,
Aleksandar Mastilovic

On Aug 21, 2019, at 12:32 AM, Zili Chen <[hidden email]> wrote:

Hi guys,

We want to have an accurate idea of how users actually use 
high-availability services in Flink, especially how you customize
high-availability services by HighAvailabilityServicesFactory.

Basically there are standalone impl., zookeeper impl., embedded impl.
used in MiniCluster, YARN impl. not yet implemented, and a gate to
customized implementations.

Generally I think standalone impl. and zookeeper impl. are the most
widely used implementations. The embedded impl. is used without
awareness when users run a MiniCluster.

Besides that, it is helpful to know how you guys customize 
high-availability services using HighAvailabilityServicesFactory 
interface for the ongoing FLINK-10333[1] which would evolve 
high-availability services in Flink. As well as whether there is any
user take interest in the not yet implemented YARN impl..

Any user case should be helpful. I really appreciate your time and your
insight.


Reply | Threaded
Open this post in threaded view
|

Re: [SURVEY] How do you use high-availability services in Flink?

Aleksandar Mastilovic
Hi Zili,

Sorry for replying late, we had a holiday here in the US. 

We are using the high-availability.storageDir but only for the Blob store, however job graphs, checkpoints and checkpoint IDs are stored in MapDB.

On Aug 28, 2019, at 7:48 PM, Zili Chen <[hidden email]> wrote:

Thanks for your email Aleksandar! Sorry for reply late.

May I ask a question, do you config high-availability.storageDir in your case? 
That is, do you persist and retrieve job graph & checkpoint totally in MapDB
or, as ZooKeeper implementation does, persist them in an external filesystem
and just store a handle in MapDB?

Best,
tison.


Aleksandar Mastilovic <[hidden email]> 于2019年8月24日周六 上午7:04写道:
Hi all,

Since I’m currently working on an implementation of HighAvailabilityServicesFactory I thought it would be good to report here about my experience so far.

Our use case is cloud based, where we package Flink and our supplementary code into a docker image, then run those images through Kubernetes+Helm orchestration.

We don’t use Hadoop nor HDFS but rather Google Cloud Storage, and we don’t run ZooKeepers. Our Flink setup consists of one JobManager and multiple TaskManagers on-demand.

Due to the nature of cloud computing there’s a possibility our JobManager instance might go down, only to be automatically recreated through Kubernetes. Since we don’t run ZooKeeper
We needed a way to run a variant of High Availability cluster where we would keep JobManager information on our attached persistent k8s volume instead of ZooKeeper.

So far we have a setup that seems to be working on our local deployment, we haven’t yet tried it in the actual cloud.

As far as implementation goes, here’s what we did:

We used MapDB (mapdb.org) as our storage format, to persist lists of objects onto disk. We partially relied on StandaloneHaServices for our HaServices implementation. Otherwise we looked at the ZooKeeperHaServices and related classes for inspiration and guidance.

Here’s a list of new classes:

FileSystemCheckpointIDCounter implements CheckpointIDCounter
FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory
FileSystemCompletedCheckpointStore implements CompletedCheckpointStore
FileSystemHaServices extends StandaloneHaServices
FileSystemHaServicesFactory implements HighAvailabilityServicesFactory
FileSystemSubmittedJobGraphStore implements SubmittedJobGraphStore

Testing so far proved that bringing down a JobManager and bringing it back up does indeed restore all the running jobs. Job creation/destruction also works. 

Hope this helps!

Thanks,
Aleksandar Mastilovic

On Aug 21, 2019, at 12:32 AM, Zili Chen <[hidden email]> wrote:

Hi guys,

We want to have an accurate idea of how users actually use 
high-availability services in Flink, especially how you customize
high-availability services by HighAvailabilityServicesFactory.

Basically there are standalone impl., zookeeper impl., embedded impl.
used in MiniCluster, YARN impl. not yet implemented, and a gate to
customized implementations.

Generally I think standalone impl. and zookeeper impl. are the most
widely used implementations. The embedded impl. is used without
awareness when users run a MiniCluster.

Besides that, it is helpful to know how you guys customize 
high-availability services using HighAvailabilityServicesFactory 
interface for the ongoing FLINK-10333[1] which would evolve 
high-availability services in Flink. As well as whether there is any
user take interest in the not yet implemented YARN impl..

Any user case should be helpful. I really appreciate your time and your
insight.