JobManager HA without Distributed FileSystem

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

JobManager HA without Distributed FileSystem

snntr
Hi all,

the documenation of JobManager HA [1] explains that HA is only possible
with the FS state backend as Job Manager metadata is saved there.

What are the particular problems using JobManager HA with the
MemoryStatebackend?

As I understand it, the state is checkpointed to all JobManagers
(leaders + standy) when using the MemoryStateBackend or am I wrong here?

Follow Up Question: Is it generally possible to setup a highly
available, at-least-once (source: Kafka) pipeline without a distributed
filesystem (only local FS and Zookeeper) for the checkpoints?

Cheers,

Konstantin


[1]
https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: JobManager HA without Distributed FileSystem

Stephan Ewen
Hi!

The state one can store in ZooKeeper is only very small (recommended is smaller than 1MB per handle).

For HA, the JobManager needs to persist:
  - JobGraph
  - JAR files
  - Checkpoint Metadata

Those are easily too large for ZooKeeper, which is why Flink currently requires a DFS to store those, and only stores "pointers" to the data in the DFS in ZooKeeper.

Are you thinking of another highly available storage for larger data (megabytes) that could be used here?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf <[hidden email]> wrote:
Hi all,

the documenation of JobManager HA [1] explains that HA is only possible
with the FS state backend as Job Manager metadata is saved there.

What are the particular problems using JobManager HA with the
MemoryStatebackend?

As I understand it, the state is checkpointed to all JobManagers
(leaders + standy) when using the MemoryStateBackend or am I wrong here?

Follow Up Question: Is it generally possible to setup a highly
available, at-least-once (source: Kafka) pipeline without a distributed
filesystem (only local FS and Zookeeper) for the checkpoints?

Cheers,

Konstantin


[1]
https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html

--
Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Reply | Threaded
Open this post in threaded view
|

Re: JobManager HA without Distributed FileSystem

snntr
Hi Stephan,

thanks for the quick response, understood. Is there a reason why JAR
files and JobGraph are not sent to all JobManagers by the client?
Accordingly, why don't all taskmanagers sent Checkpoint Metadata to all
JobManagers?

I did not have any other storage at mind [1]. I am basically interested
in what is possible with the MemoryStateBackend alone. So, from here on
let's take JM HA aside.

For a stand-alone Flink Cluster with MemoryStateBackend (default config)
I can only have 1MB (akka.framesize) of state per handle, correct?

There is one handle per operator, correct?

So, for example, a KafkaConsumer with parallism 2 and consuming from a
topic with 20 partitions:

Two operators each with a state of a HashMap<KafkaTopicPartition, Long>
with 10 entries. Kafka Topic Partition has field: String, int, int. So
this should amount to < 1kbyte, if the name of the partition is of
reasonable length.

So, if this is the only state in the pipeline, there is no problem using
the MemoryStateBackend, if one accepts, that a JM failure means a loss
of the state? In case of the KafkaConsumer, the current offsets are also
stored in Kafka/Zookeeper anyway, so actually there would not be any
loss of data even in this case, just duplication.

Does this make sense?

Cheers,

Konstantin

[1] We did consider reviving FLINK-3035 (Redis Statebackend), but that's
a different topic ;)



On 23.08.2016 20:45, Stephan Ewen wrote:

> Hi!
>
> The state one can store in ZooKeeper is only very small (recommended is
> smaller than 1MB per handle).
>
> For HA, the JobManager needs to persist:
>   - JobGraph
>   - JAR files
>   - Checkpoint Metadata
>
> Those are easily too large for ZooKeeper, which is why Flink currently
> requires a DFS to store those, and only stores "pointers" to the data in
> the DFS in ZooKeeper.
>
> Are you thinking of another highly available storage for larger data
> (megabytes) that could be used here?
>
> Greetings,
> Stephan
>
>
> On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi all,
>
>     the documenation of JobManager HA [1] explains that HA is only possible
>     with the FS state backend as Job Manager metadata is saved there.
>
>     What are the particular problems using JobManager HA with the
>     MemoryStatebackend?
>
>     As I understand it, the state is checkpointed to all JobManagers
>     (leaders + standy) when using the MemoryStateBackend or am I wrong here?
>
>     Follow Up Question: Is it generally possible to setup a highly
>     available, at-least-once (source: Kafka) pipeline without a distributed
>     filesystem (only local FS and Zookeeper) for the checkpoints?
>
>     Cheers,
>
>     Konstantin
>
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html
>     <https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html>
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     <tel:%2B49-174-3413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: JobManager HA without Distributed FileSystem

Stephan Ewen
Hi!

  - Concerning replication to other JobManagers - this could be an extension, but it would need to also support additional replacement JobManagers coming up later, so it would need a replication service in the JobManagers, not just a "send to all" at program startup.

  - That would work in theory like this, yes, assuming the JobGraph storage would be solved (the jars can always be added to the "lib" directory in the first place).
    Right now, Flink does not accept that setting, but one could think about that configuration, definitely. We are currently abstracting HighAvailability services for different configurations, this could be one of them.

Stephan


On Wed, Aug 24, 2016 at 9:30 AM, Konstantin Knauf <[hidden email]> wrote:
Hi Stephan,

thanks for the quick response, understood. Is there a reason why JAR
files and JobGraph are not sent to all JobManagers by the client?
Accordingly, why don't all taskmanagers sent Checkpoint Metadata to all
JobManagers?

I did not have any other storage at mind [1]. I am basically interested
in what is possible with the MemoryStateBackend alone. So, from here on
let's take JM HA aside.

For a stand-alone Flink Cluster with MemoryStateBackend (default config)
I can only have 1MB (akka.framesize) of state per handle, correct?

There is one handle per operator, correct?

So, for example, a KafkaConsumer with parallism 2 and consuming from a
topic with 20 partitions:

Two operators each with a state of a HashMap<KafkaTopicPartition, Long>
with 10 entries. Kafka Topic Partition has field: String, int, int. So
this should amount to < 1kbyte, if the name of the partition is of
reasonable length.

So, if this is the only state in the pipeline, there is no problem using
the MemoryStateBackend, if one accepts, that a JM failure means a loss
of the state? In case of the KafkaConsumer, the current offsets are also
stored in Kafka/Zookeeper anyway, so actually there would not be any
loss of data even in this case, just duplication.

Does this make sense?

Cheers,

Konstantin

[1] We did consider reviving FLINK-3035 (Redis Statebackend), but that's
a different topic ;)



On 23.08.2016 20:45, Stephan Ewen wrote:
> Hi!
>
> The state one can store in ZooKeeper is only very small (recommended is
> smaller than 1MB per handle).
>
> For HA, the JobManager needs to persist:
>   - JobGraph
>   - JAR files
>   - Checkpoint Metadata
>
> Those are easily too large for ZooKeeper, which is why Flink currently
> requires a DFS to store those, and only stores "pointers" to the data in
> the DFS in ZooKeeper.
>
> Are you thinking of another highly available storage for larger data
> (megabytes) that could be used here?
>
> Greetings,
> Stephan
>
>
> On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi all,
>
>     the documenation of JobManager HA [1] explains that HA is only possible
>     with the FS state backend as Job Manager metadata is saved there.
>
>     What are the particular problems using JobManager HA with the
>     MemoryStatebackend?
>
>     As I understand it, the state is checkpointed to all JobManagers
>     (leaders + standy) when using the MemoryStateBackend or am I wrong here?
>
>     Follow Up Question: Is it generally possible to setup a highly
>     available, at-least-once (source: Kafka) pipeline without a distributed
>     filesystem (only local FS and Zookeeper) for the checkpoints?
>
>     Cheers,
>
>     Konstantin
>
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html
>     <https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html>
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
>     <tel:%2B49-174-3413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * <a href="tel:%2B49-174-3413182" value="+491743413182">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082