FileSystemHaServices and BlobStore

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

FileSystemHaServices and BlobStore

Alexey Trenikhun
Hello,
I'm thinking about implementing FileSystemHaServices - single leader, but persistent RunningJobRegistry, CheckpointIDCounter, CompletedCheckpointStore and JobGraphStore. I'm not sure do you need FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore, but not clear is to due to having multiple JobManagers (leader + follower) or necessity to preserve BLOBs on restart.

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

r_khachatryan
Hello Alexey,

I think you need FileSystemBlobStore as you are implementing HA Services, and BLOBs should be highly available too.
However, I'm a bit concerned about the direction in general: it essentially means re-implementing ZK functionality on top of FS.
What are the motivation and the use case?

Regards,
Roman


On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I'm thinking about implementing FileSystemHaServices - single leader, but persistent RunningJobRegistry, CheckpointIDCounter, CompletedCheckpointStore and JobGraphStore. I'm not sure do you need FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore, but not clear is to due to having multiple JobManagers (leader + follower) or necessity to preserve BLOBs on restart.

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Alexey Trenikhun
Motivation is to have k8s HA setup without extra component - Zookeeper, see [1]

Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks like  if we start job from savepoint, then persistence of BlobStore is not necessary, but is it needed if we recover from checkpoint?

Thanks,
Alexey




From: Khachatryan Roman <[hidden email]>
Sent: Friday, August 28, 2020 9:24 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hello Alexey,

I think you need FileSystemBlobStore as you are implementing HA Services, and BLOBs should be highly available too.
However, I'm a bit concerned about the direction in general: it essentially means re-implementing ZK functionality on top of FS.
What are the motivation and the use case?

Regards,
Roman


On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I'm thinking about implementing FileSystemHaServices - single leader, but persistent RunningJobRegistry, CheckpointIDCounter, CompletedCheckpointStore and JobGraphStore. I'm not sure do you need FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore, but not clear is to due to having multiple JobManagers (leader + follower) or necessity to preserve BLOBs on restart.

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Alexey Trenikhun
Did test with streaming job and FileSystemHaService using VoidBlobStore (no HA Blob), looks like job was able to recover from both JM restart and TM restart. Any idea in what use cases HA Blob is needed?

Thanks,
Alexey

From: Alexey Trenikhun <[hidden email]>
Sent: Friday, August 28, 2020 11:31 AM
To: Khachatryan Roman <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Motivation is to have k8s HA setup without extra component - Zookeeper, see [1]

Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks like  if we start job from savepoint, then persistence of BlobStore is not necessary, but is it needed if we recover from checkpoint?

Thanks,
Alexey




From: Khachatryan Roman <[hidden email]>
Sent: Friday, August 28, 2020 9:24 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hello Alexey,

I think you need FileSystemBlobStore as you are implementing HA Services, and BLOBs should be highly available too.
However, I'm a bit concerned about the direction in general: it essentially means re-implementing ZK functionality on top of FS.
What are the motivation and the use case?

Regards,
Roman


On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I'm thinking about implementing FileSystemHaServices - single leader, but persistent RunningJobRegistry, CheckpointIDCounter, CompletedCheckpointStore and JobGraphStore. I'm not sure do you need FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore, but not clear is to due to having multiple JobManagers (leader + follower) or necessity to preserve BLOBs on restart.

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

r_khachatryan
+ dev

Blob store is used for jars, serialized job, and task information and logs. You can find some information at https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture 

I guess in your setup, Flink was able to pick up local files. 
HA setup presumes that Flink can survive the loss of that JM host and its local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <[hidden email]> wrote:
Did test with streaming job and FileSystemHaService using VoidBlobStore (no HA Blob), looks like job was able to recover from both JM restart and TM restart. Any idea in what use cases HA Blob is needed?

Thanks,
Alexey

From: Alexey Trenikhun <[hidden email]>
Sent: Friday, August 28, 2020 11:31 AM
To: Khachatryan Roman <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Motivation is to have k8s HA setup without extra component - Zookeeper, see [1]

Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks like  if we start job from savepoint, then persistence of BlobStore is not necessary, but is it needed if we recover from checkpoint?

Thanks,
Alexey




From: Khachatryan Roman <[hidden email]>
Sent: Friday, August 28, 2020 9:24 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hello Alexey,

I think you need FileSystemBlobStore as you are implementing HA Services, and BLOBs should be highly available too.
However, I'm a bit concerned about the direction in general: it essentially means re-implementing ZK functionality on top of FS.
What are the motivation and the use case?

Regards,
Roman


On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I'm thinking about implementing FileSystemHaServices - single leader, but persistent RunningJobRegistry, CheckpointIDCounter, CompletedCheckpointStore and JobGraphStore. I'm not sure do you need FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore, but not clear is to due to having multiple JobManagers (leader + follower) or necessity to preserve BLOBs on restart.

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Yang Wang
Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.



Best,
Yang


Khachatryan Roman <[hidden email]> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <[hidden email]> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <[hidden email]>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <[hidden email]>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Alexey Trenikhun
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.



Best,
Yang


Khachatryan Roman <[hidden email]> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <[hidden email]> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <[hidden email]>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <[hidden email]>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Yang Wang
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume + FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage. But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2020年9月2日周三 上午7:36写道:
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.



Best,
Yang


Khachatryan Roman <[hidden email]> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <[hidden email]> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <[hidden email]>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <[hidden email]>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Alexey Trenikhun
Hi Yang,
Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and RunningJobsRegistry

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Wednesday, September 2, 2020 8:21:20 PM
To: Alexey Trenikhun <[hidden email]>
Cc: dev <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume + FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage. But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2020年9月2日周三 上午7:36写道:
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.



Best,
Yang


Khachatryan Roman <[hidden email]> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <[hidden email]> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <[hidden email]>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <[hidden email]>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Yang Wang
Hi Alexey,

I have created FLIP-144[1] for the native Kubernetes HA support. Please have a look if you are interested.
Frankly speaking, I am not against the "StatefulSet + PV + FileSystemHAService". Maybe in the future,
we could have the both in Flink.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2020年9月3日周四 下午11:44写道:
Hi Yang,
Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and RunningJobsRegistry

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Wednesday, September 2, 2020 8:21:20 PM
To: Alexey Trenikhun <[hidden email]>
Cc: dev <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume + FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage. But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2020年9月2日周三 上午7:36写道:
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.



Best,
Yang


Khachatryan Roman <[hidden email]> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <[hidden email]> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <[hidden email]>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <[hidden email]>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FileSystemHaServices and BlobStore

Alexey Trenikhun
Hi Yang,
I saw this FLIP, it is very good feature, I think overall for Kubernetes, it is preferred over “StatefulSet + PV + FileSystemHAService” approach, when it will be available we plan to use it. On other hand looks like FileSystemHAService is easier to implement, I thought about contributing to FileSystemHAService, maybe it will be useful to someone else until FLIP-144 will be available

Thanks,
Alexey

From: Yang Wang <[hidden email]>
Sent: Wednesday, September 16, 2020 8:25 PM
To: Alexey Trenikhun
Cc: dev; Flink User Mail List
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

I have created FLIP-144[1] for the native Kubernetes HA support. Please have a look if you are interested.
Frankly speaking, I am not against the "StatefulSet + PV + FileSystemHAService". Maybe in the future,
we could have the both in Flink.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2020年9月3日周四 下午11:44写道:
Hi Yang,
Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and RunningJobsRegistry

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Wednesday, September 2, 2020 8:21:20 PM
To: Alexey Trenikhun <[hidden email]>
Cc: dev <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume + FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage. But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <[hidden email]> 于2020年9月2日周三 上午7:36写道:
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey


From: Yang Wang <[hidden email]>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: FileSystemHaServices and BlobStore
 
Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.



Best,
Yang


Khachatryan Roman <[hidden email]> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <[hidden email]> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <[hidden email]>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <[hidden email]>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <[hidden email]>
> *Cc:* Flink User Mail List <[hidden email]>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <[hidden email]> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>