partial savepoints/combining savepoints

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

partial savepoints/combining savepoints

Claudia Wegmann

Hey everyone,

 

I’ve got some questions regarding savepoints in Flink. I have the following situation:

 

There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions:

 

1.)    Can I combine two or more savepoints in one program?
Think of two services already running. Now I’m starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program?

2.)    Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job?

Thanks for your comments and best wishes,

Claudia

Reply | Threaded
Open this post in threaded view
|

Re: partial savepoints/combining savepoints

Till Rohrmann
Hi Claudia,

unfortunately neither taking partial savepoints nor combining multiple savepoints into one savepoint is currently supported by Flink.

However, we're currently working on dynamic scaling which will allow to adjust the parallelism of your Flink job. This helps you to scale in/out depending on the workload of your job. However, you would only be able to scale within a single Flink job and not across Flink jobs.

Cheers,
Till

On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmann <[hidden email]> wrote:

Hey everyone,

 

I’ve got some questions regarding savepoints in Flink. I have the following situation:

 

There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions:

 

1.)    Can I combine two or more savepoints in one program?
Think of two services already running. Now I’m starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program?

2.)    Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job?

Thanks for your comments and best wishes,

Claudia


Reply | Threaded
Open this post in threaded view
|

AW: partial savepoints/combining savepoints

Claudia Wegmann

Hi Till,

 

thanks for the quick reply. Too bad, I thought I was on the right track with savepoints here.

 

Some follow-up questions:

 

1.)    Can I do the whole thing of transferring state and the position in the Kafka topic manually for one stream? In other words: is this information accessible easily?

2.)    In any case I would need to stop the running job, change the topology (e.g. the number of streams in the program) and resume processing. Can you name the overhead of time coming from stopping and starting a Flink job?

3.)    I’m aware of the upcoming feature for scaling in and out. But I don’t quite see, how this will help me with different services.
I thought of each service having its own Flink instance/cluster. I would commit this service as one job to the dedicated Flink containing all the necessary streams and computations. Is this a bad architecture?
Would it be better to have one big Flink cluster and commit one big Job, which contains all the streams? (As I got to know, committing multiple jobs to one Flink instance is not recommended).
To be honest, I’m not quite there to totally understand the different deployment options of Flink and how to bring them together with a microservice architecture where I have a service packed as a JAR-File and wanting to be able to just deploy this JAR-File. I thought of this service containing Flink and then start the JobManager and some TaskManagers from this service and deploy itself as the Flink job with a dedicated entry point. Is this a good idea? Or is it even possible?

 

Thanks in advance,

Claudia

 

Von: Till Rohrmann [mailto:[hidden email]]
Gesendet: Montag, 1. August 2016 16:21
An: [hidden email]
Betreff:
Re: partial savepoints/combining savepoints

 

Hi Claudia,

 

unfortunately neither taking partial savepoints nor combining multiple savepoints into one savepoint is currently supported by Flink.

 

However, we're currently working on dynamic scaling which will allow to adjust the parallelism of your Flink job. This helps you to scale in/out depending on the workload of your job. However, you would only be able to scale within a single Flink job and not across Flink jobs.

 

Cheers,

Till

 

On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmann <[hidden email]> wrote:

Hey everyone,

 

I’ve got some questions regarding savepoints in Flink. I have the following situation:

 

There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions:

 

1.)    Can I combine two or more savepoints in one program?
Think of two services already running. Now I’m starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program?

2.)    Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job?

Thanks for your comments and best wishes,

Claudia

 

Reply | Threaded
Open this post in threaded view
|

Re: partial savepoints/combining savepoints

Till Rohrmann
Hi Claudia,

1) At the moment the offset information will be written to the ZooKeeper quorum used by Kafka as well as to the savepoint. Reading the savepoint is not so easy to do since you would need to know the internal representation of the savepoint. But you could try to read the Kafka offsets from ZooKeeper.

2) That depends a little bit on the deployment and the size of the job. Are you using a yarn session or a standalone cluster? Then the task manager should already be registered at the job manager and the deployment for each task should be in the milli second range. If you start a new yarn application for a flink job (per job cluster), then it might take a bit longer depending on how long it takes to allocate the requested resources by Yarn. But once this is done, the deployment for a task should be in the sub second range.

3) If you want to keep the different Flink jobs separated, then you should submit them separately to a Flink cluster or start a Flink cluster per job (e.g. with Yarn). I don't think that this a bad architecture if you want to fulfil these requirements. However, I'm not sure whether merging and splitting savepoints will be implemented anytime soon.

Actually we're currently working on improving Flink's functionality to be started with a dedicated job. This means that you start a job manager which has already the job jar in its classpath and directly starts executing the contained job. This will be helpful for deployment scenarios how they appear when using docker images, for example. I could imagine that this could be helpful for your use case as well.

Cheers,
Till

On Mon, Aug 1, 2016 at 10:40 PM, Claudia Wegmann <[hidden email]> wrote:

Hi Till,

 

thanks for the quick reply. Too bad, I thought I was on the right track with savepoints here.

 

Some follow-up questions:

 

1.)    Can I do the whole thing of transferring state and the position in the Kafka topic manually for one stream? In other words: is this information accessible easily?

2.)    In any case I would need to stop the running job, change the topology (e.g. the number of streams in the program) and resume processing. Can you name the overhead of time coming from stopping and starting a Flink job?

3.)    I’m aware of the upcoming feature for scaling in and out. But I don’t quite see, how this will help me with different services.
I thought of each service having its own Flink instance/cluster. I would commit this service as one job to the dedicated Flink containing all the necessary streams and computations. Is this a bad architecture?
Would it be better to have one big Flink cluster and commit one big Job, which contains all the streams? (As I got to know, committing multiple jobs to one Flink instance is not recommended).
To be honest, I’m not quite there to totally understand the different deployment options of Flink and how to bring them together with a microservice architecture where I have a service packed as a JAR-File and wanting to be able to just deploy this JAR-File. I thought of this service containing Flink and then start the JobManager and some TaskManagers from this service and deploy itself as the Flink job with a dedicated entry point. Is this a good idea? Or is it even possible?

 

Thanks in advance,

Claudia

 

Von: Till Rohrmann [mailto:[hidden email]]
Gesendet: Montag, 1. August 2016 16:21
An: [hidden email]
Betreff:
Re: partial savepoints/combining savepoints

 

Hi Claudia,

 

unfortunately neither taking partial savepoints nor combining multiple savepoints into one savepoint is currently supported by Flink.

 

However, we're currently working on dynamic scaling which will allow to adjust the parallelism of your Flink job. This helps you to scale in/out depending on the workload of your job. However, you would only be able to scale within a single Flink job and not across Flink jobs.

 

Cheers,

Till

 

On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmann <[hidden email]> wrote:

Hey everyone,

 

I’ve got some questions regarding savepoints in Flink. I have the following situation:

 

There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions:

 

1.)    Can I combine two or more savepoints in one program?
Think of two services already running. Now I’m starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program?

2.)    Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job?

Thanks for your comments and best wishes,

Claudia

 


Reply | Threaded
Open this post in threaded view
|

AW: partial savepoints/combining savepoints

Claudia Wegmann

Hi again,

 

I was thinking this over and tried some stuff. Some questions remain, though:

 

To 2)

a) I only have used standalone mode yet. What would be the upsides of using Yarn?

b) Does an easy way to start Flink (JobManager+TaskManagers) programmatically already exist?

c) I tried to build my own StreamingEnvironments (local and remote) to add a function for stopping the job. This works quite fine locally. I can just call stop()-Method of the FlinkMiniCluster with the given name of the job. To test this, I build a streaming topology, execute it, call stop and build a new topology and execute it. However,  I couldn’t accomplish the same in an easy way for the remote environment. I tried to call the stop()-Method of the client with the JobID gained by obtaining the JobGraph from the StreamGraph. But this doesn’t seem to work. When I run the test program on my Flink cluster, only the first topology is executed. Nothing happens after the first execution is finished. So I guess the remote stop()-Method doesn’t work and the second execute() is simply ignored. Should the described process of stopping work? Or would is it, I’m doing wrong?

Btw, where does the output of a print() source function go in remote execution? There is no output in the web frontend (jobmanager/stdout). I guess I have to look for a file on the JobManager node? Where and what would be the name/directory?

 

To 3)

What’s the consequence of the Flink jobs not being separated? I guess I wouldn’t need to manage more than one cluster setup but could only run one job at a time?

 

Thanks a lot again,

Claudia

 

 

Von: Till Rohrmann [mailto:[hidden email]]
Gesendet: Dienstag, 2. August 2016 16:27
An: [hidden email]
Betreff: Re: partial savepoints/combining savepoints

 

Hi Claudia,

 

1) At the moment the offset information will be written to the ZooKeeper quorum used by Kafka as well as to the savepoint. Reading the savepoint is not so easy to do since you would need to know the internal representation of the savepoint. But you could try to read the Kafka offsets from ZooKeeper.

 

2) That depends a little bit on the deployment and the size of the job. Are you using a yarn session or a standalone cluster? Then the task manager should already be registered at the job manager and the deployment for each task should be in the milli second range. If you start a new yarn application for a flink job (per job cluster), then it might take a bit longer depending on how long it takes to allocate the requested resources by Yarn. But once this is done, the deployment for a task should be in the sub second range.

 

3) If you want to keep the different Flink jobs separated, then you should submit them separately to a Flink cluster or start a Flink cluster per job (e.g. with Yarn). I don't think that this a bad architecture if you want to fulfil these requirements. However, I'm not sure whether merging and splitting savepoints will be implemented anytime soon.

 

Actually we're currently working on improving Flink's functionality to be started with a dedicated job. This means that you start a job manager which has already the job jar in its classpath and directly starts executing the contained job. This will be helpful for deployment scenarios how they appear when using docker images, for example. I could imagine that this could be helpful for your use case as well.

 

Cheers,

Till

 

On Mon, Aug 1, 2016 at 10:40 PM, Claudia Wegmann <[hidden email]> wrote:

Hi Till,

 

thanks for the quick reply. Too bad, I thought I was on the right track with savepoints here.

 

Some follow-up questions:

 

1.)    Can I do the whole thing of transferring state and the position in the Kafka topic manually for one stream? In other words: is this information accessible easily?

2.)    In any case I would need to stop the running job, change the topology (e.g. the number of streams in the program) and resume processing. Can you name the overhead of time coming from stopping and starting a Flink job?

3.)    I’m aware of the upcoming feature for scaling in and out. But I don’t quite see, how this will help me with different services.
I thought of each service having its own Flink instance/cluster. I would commit this service as one job to the dedicated Flink containing all the necessary streams and computations. Is this a bad architecture?
Would it be better to have one big Flink cluster and commit one big Job, which contains all the streams? (As I got to know, committing multiple jobs to one Flink instance is not recommended).
To be honest, I’m not quite there to totally understand the different deployment options of Flink and how to bring them together with a microservice architecture where I have a service packed as a JAR-File and wanting to be able to just deploy this JAR-File. I thought of this service containing Flink and then start the JobManager and some TaskManagers from this service and deploy itself as the Flink job with a dedicated entry point. Is this a good idea? Or is it even possible?

 

Thanks in advance,

Claudia

 

Von: Till Rohrmann [mailto:[hidden email]]
Gesendet: Montag, 1. August 2016 16:21
An:
[hidden email]
Betreff: Re: partial savepoints/combining savepoints

 

Hi Claudia,

 

unfortunately neither taking partial savepoints nor combining multiple savepoints into one savepoint is currently supported by Flink.

 

However, we're currently working on dynamic scaling which will allow to adjust the parallelism of your Flink job. This helps you to scale in/out depending on the workload of your job. However, you would only be able to scale within a single Flink job and not across Flink jobs.

 

Cheers,

Till

 

On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmann <[hidden email]> wrote:

Hey everyone,

 

I’ve got some questions regarding savepoints in Flink. I have the following situation:

 

There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions:

 

1.)    Can I combine two or more savepoints in one program?
Think of two services already running. Now I’m starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program?

2.)    Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job?

Thanks for your comments and best wishes,

Claudia

 

 

Reply | Threaded
Open this post in threaded view
|

Re: partial savepoints/combining savepoints

Till Rohrmann
Hi Claudia,

On Wed, Aug 17, 2016 at 5:06 PM, Claudia Wegmann <[hidden email]> wrote:

Hi again,

 

I was thinking this over and tried some stuff. Some questions remain, though:

 

To 2)

a) I only have used standalone mode yet. What would be the upsides of using Yarn?


The upside would be that you're using a cluster resource management system which can give you better HA guarantees for example. It will basically allocate some resources for you and you don't have to take care of that. Once you're done with your processes these resources are returned and can be used by another YARN application.


b) Does an easy way to start Flink (JobManager+TaskManagers) programmatically already exist?


What do you mean here? Do you mean a local cluster? Then you could use the FlinkMiniCluster. Otherwise there are scripts in the flink/bin directory which allow you start-up a Flink cluster.
 

c) I tried to build my own StreamingEnvironments (local and remote) to add a function for stopping the job. This works quite fine locally. I can just call stop()-Method of the FlinkMiniCluster with the given name of the job. To test this, I build a streaming topology, execute it, call stop and build a new topology and execute it. However,  I couldn’t accomplish the same in an easy way for the remote environment. I tried to call the stop()-Method of the client with the JobID gained by obtaining the JobGraph from the StreamGraph. But this doesn’t seem to work. When I run the test program on my Flink cluster, only the first topology is executed. Nothing happens after the first execution is finished. So I guess the remote stop()-Method doesn’t work and the second execute() is simply ignored. Should the described process of stopping work? Or would is it, I’m doing wrong?


The stop signal is only supported by some streaming sources. A streaming source is stoppable if it implements the StoppableFunction interface. You can check for that. If you want to terminate a running job, you should cancel the job. That is currently the only way to stop a job. We'll improve the stop signal in the future to gracefully stop a job with respect to external systems and to take a savepoint. 

 

Btw, where does the output of a print() source function go in remote execution? There is no output in the web frontend (jobmanager/stdout). I guess I have to look for a file on the JobManager node? Where and what would be the name/directory?


In the remote case the print output is written to the jobmanager/taskmanager.out file which you can find in the log directory.

 

To 3)

What’s the consequence of the Flink jobs not being separated? I guess I wouldn’t need to manage more than one cluster setup but could only run one job at a time?


The consequence is that multiple concurrently running jobs can affect each other because they are running in the same process. So for example, this can have security implications. 

 

Thanks a lot again,

Claudia

 

 

Von: Till Rohrmann [mailto:[hidden email]]
Gesendet: Dienstag, 2. August 2016 16:27


An: [hidden email]
Betreff: Re: partial savepoints/combining savepoints

 

Hi Claudia,

 

1) At the moment the offset information will be written to the ZooKeeper quorum used by Kafka as well as to the savepoint. Reading the savepoint is not so easy to do since you would need to know the internal representation of the savepoint. But you could try to read the Kafka offsets from ZooKeeper.

 

2) That depends a little bit on the deployment and the size of the job. Are you using a yarn session or a standalone cluster? Then the task manager should already be registered at the job manager and the deployment for each task should be in the milli second range. If you start a new yarn application for a flink job (per job cluster), then it might take a bit longer depending on how long it takes to allocate the requested resources by Yarn. But once this is done, the deployment for a task should be in the sub second range.

 

3) If you want to keep the different Flink jobs separated, then you should submit them separately to a Flink cluster or start a Flink cluster per job (e.g. with Yarn). I don't think that this a bad architecture if you want to fulfil these requirements. However, I'm not sure whether merging and splitting savepoints will be implemented anytime soon.

 

Actually we're currently working on improving Flink's functionality to be started with a dedicated job. This means that you start a job manager which has already the job jar in its classpath and directly starts executing the contained job. This will be helpful for deployment scenarios how they appear when using docker images, for example. I could imagine that this could be helpful for your use case as well.

 

Cheers,

Till

 

On Mon, Aug 1, 2016 at 10:40 PM, Claudia Wegmann <[hidden email]> wrote:

Hi Till,

 

thanks for the quick reply. Too bad, I thought I was on the right track with savepoints here.

 

Some follow-up questions:

 

1.)    Can I do the whole thing of transferring state and the position in the Kafka topic manually for one stream? In other words: is this information accessible easily?

2.)    In any case I would need to stop the running job, change the topology (e.g. the number of streams in the program) and resume processing. Can you name the overhead of time coming from stopping and starting a Flink job?

3.)    I’m aware of the upcoming feature for scaling in and out. But I don’t quite see, how this will help me with different services.
I thought of each service having its own Flink instance/cluster. I would commit this service as one job to the dedicated Flink containing all the necessary streams and computations. Is this a bad architecture?
Would it be better to have one big Flink cluster and commit one big Job, which contains all the streams? (As I got to know, committing multiple jobs to one Flink instance is not recommended).
To be honest, I’m not quite there to totally understand the different deployment options of Flink and how to bring them together with a microservice architecture where I have a service packed as a JAR-File and wanting to be able to just deploy this JAR-File. I thought of this service containing Flink and then start the JobManager and some TaskManagers from this service and deploy itself as the Flink job with a dedicated entry point. Is this a good idea? Or is it even possible?

 

Thanks in advance,

Claudia

 

Von: Till Rohrmann [mailto:[hidden email]]
Gesendet: Montag, 1. August 2016 16:21
An:
[hidden email]
Betreff: Re: partial savepoints/combining savepoints

 

Hi Claudia,

 

unfortunately neither taking partial savepoints nor combining multiple savepoints into one savepoint is currently supported by Flink.

 

However, we're currently working on dynamic scaling which will allow to adjust the parallelism of your Flink job. This helps you to scale in/out depending on the workload of your job. However, you would only be able to scale within a single Flink job and not across Flink jobs.

 

Cheers,

Till

 

On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmann <[hidden email]> wrote:

Hey everyone,

 

I’ve got some questions regarding savepoints in Flink. I have the following situation:

 

There is a microservice that reads data from Kafka topics, creates Flink streams from this data and does different computations/pattern matching workloads. If the overall workload for this service becomes too big, I want to start a new instance of this service and share the work between the running services. To accomplish that, I thought about using Flinks savepoint mechanism. But there are some open questions:

 

1.)    Can I combine two or more savepoints in one program?
Think of two services already running. Now I’m starting up a third service. The new one would get savepoints from the already running services. It than would continue computation of some streams while the other services would discard calculation on these streams now calculated by the new service. So, is it possible to combine two or more savepoints in one program?

2.)    Another approach I could think of for accomplishing the introduction of a new service would be, to just take a savepoint of the streams that change service. Can I only take a savepoint of a part of the running job?

Thanks for your comments and best wishes,

Claudia