Hi all,
the documenation of JobManager HA [1] explains that HA is only possible with the FS state backend as Job Manager metadata is saved there. What are the particular problems using JobManager HA with the MemoryStatebackend? As I understand it, the state is checkpointed to all JobManagers (leaders + standy) when using the MemoryStateBackend or am I wrong here? Follow Up Question: Is it generally possible to setup a highly available, at-least-once (source: Kafka) pipeline without a distributed filesystem (only local FS and Zookeeper) for the checkpoints? Cheers, Konstantin [1] https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Hi! The state one can store in ZooKeeper is only very small (recommended is smaller than 1MB per handle). For HA, the JobManager needs to persist: - JobGraph - JAR files - Checkpoint Metadata Those are easily too large for ZooKeeper, which is why Flink currently requires a DFS to store those, and only stores "pointers" to the data in the DFS in ZooKeeper. Are you thinking of another highly available storage for larger data (megabytes) that could be used here? Greetings, Stephan On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf <[hidden email]> wrote: Hi all, |
Hi Stephan,
thanks for the quick response, understood. Is there a reason why JAR files and JobGraph are not sent to all JobManagers by the client? Accordingly, why don't all taskmanagers sent Checkpoint Metadata to all JobManagers? I did not have any other storage at mind [1]. I am basically interested in what is possible with the MemoryStateBackend alone. So, from here on let's take JM HA aside. For a stand-alone Flink Cluster with MemoryStateBackend (default config) I can only have 1MB (akka.framesize) of state per handle, correct? There is one handle per operator, correct? So, for example, a KafkaConsumer with parallism 2 and consuming from a topic with 20 partitions: Two operators each with a state of a HashMap<KafkaTopicPartition, Long> with 10 entries. Kafka Topic Partition has field: String, int, int. So this should amount to < 1kbyte, if the name of the partition is of reasonable length. So, if this is the only state in the pipeline, there is no problem using the MemoryStateBackend, if one accepts, that a JM failure means a loss of the state? In case of the KafkaConsumer, the current offsets are also stored in Kafka/Zookeeper anyway, so actually there would not be any loss of data even in this case, just duplication. Does this make sense? Cheers, Konstantin [1] We did consider reviving FLINK-3035 (Redis Statebackend), but that's a different topic ;) On 23.08.2016 20:45, Stephan Ewen wrote: > Hi! > > The state one can store in ZooKeeper is only very small (recommended is > smaller than 1MB per handle). > > For HA, the JobManager needs to persist: > - JobGraph > - JAR files > - Checkpoint Metadata > > Those are easily too large for ZooKeeper, which is why Flink currently > requires a DFS to store those, and only stores "pointers" to the data in > the DFS in ZooKeeper. > > Are you thinking of another highly available storage for larger data > (megabytes) that could be used here? > > Greetings, > Stephan > > > On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi all, > > the documenation of JobManager HA [1] explains that HA is only possible > with the FS state backend as Job Manager metadata is saved there. > > What are the particular problems using JobManager HA with the > MemoryStatebackend? > > As I understand it, the state is checkpointed to all JobManagers > (leaders + standy) when using the MemoryStateBackend or am I wrong here? > > Follow Up Question: Is it generally possible to setup a highly > available, at-least-once (source: Kafka) pipeline without a distributed > filesystem (only local FS and Zookeeper) for the checkpoints? > > Cheers, > > Konstantin > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html > <https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html> > > -- > Konstantin Knauf * [hidden email] > <mailto:[hidden email]> * +49-174-3413182 > <tel:%2B49-174-3413182> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Hi! - Concerning replication to other JobManagers - this could be an extension, but it would need to also support additional replacement JobManagers coming up later, so it would need a replication service in the JobManagers, not just a "send to all" at program startup. - That would work in theory like this, yes, assuming the JobGraph storage would be solved (the jars can always be added to the "lib" directory in the first place). Right now, Flink does not accept that setting, but one could think about that configuration, definitely. We are currently abstracting HighAvailability services for different configurations, this could be one of them. Stephan On Wed, Aug 24, 2016 at 9:30 AM, Konstantin Knauf <[hidden email]> wrote: Hi Stephan, |
Free forum by Nabble | Edit this page |