Incremental updates

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Incremental updates

Malgorzata Kudelska

Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia

Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Rubén Casado
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

+34 902 286 386 - +34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates

Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia

Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Aljoscha Krettek
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

+34 902 286 386 - +34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia

Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Malgorzata Kudelska
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia


Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Aljoscha Krettek
Hi,
right now, this does not work but we're is also actively working on that. This is the design doc for part one of the necessary changes: https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

Cheers,
Aljoscha

On Wed, 25 May 2016 at 13:32 Malgorzata Kudelska <[hidden email]> wrote:
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia


Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Malgorzata Kudelska

Hi,
I have the following situation.
- a keyed stream with a key defined as: userId % numberOfPartitions
- a custom flatMap transformation where I use a StateValue variable to keep the state of some calculations for each userId
- my questions are:
1. Does flink guarantee that the users with a given key will be always processed by the same partition assuming that the number of nodes is constant?
2. What will happen when one node disapears or a new one joins?  How will flink redistribute the users that were processed by the one that disapeared?
3. Will flink restore the state variables of these users from the last checkpoint and redistribute them to the new processing nodes?
4. How will flink redistribute the worload when a new node joins?

Cheers,
Gosia

Hi,
right now, this does not work but we're is also actively working on that. This is the design doc for part one of the necessary changes: https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

Cheers,
Aljoscha

On Wed, 25 May 2016 at 13:32 Malgorzata Kudelska <[hidden email]> wrote:
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia


Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Aljoscha Krettek
Hi,
first question: are you manually keying by "userId % numberOfPartitions"? Flink internally does roughly "key.hash() % numPartitions" so it is enough to specify the userId as your key.

Now, for you questions:

1. What Flink guarantees is that the state for a key k is always available when an element with key k is being processed. Internally, this means that elements with the same key will be processed by the same partition, though there would be other ways of achieving those guarantees.

2. Right now, when a node disappears the job will fail. Then recovery will kick in and restore from the latest checkpoint on a (possibly) new set of nodes. The system will make sure that the partitions and the state are correctly matched.

3. Also answered by the above, I hope at least :-)

4. This does currently not work but the ongoing work in this is tracked by https://issues.apache.org/jira/browse/FLINK-3755.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 21:09 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
I have the following situation.
- a keyed stream with a key defined as: userId % numberOfPartitions
- a custom flatMap transformation where I use a StateValue variable to keep the state of some calculations for each userId
- my questions are:
1. Does flink guarantee that the users with a given key will be always processed by the same partition assuming that the number of nodes is constant?
2. What will happen when one node disapears or a new one joins?  How will flink redistribute the users that were processed by the one that disapeared?
3. Will flink restore the state variables of these users from the last checkpoint and redistribute them to the new processing nodes?
4. How will flink redistribute the worload when a new node joins?

Cheers,
Gosia

Hi,
right now, this does not work but we're is also actively working on that. This is the design doc for part one of the necessary changes: https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

Cheers,
Aljoscha

On Wed, 25 May 2016 at 13:32 Malgorzata Kudelska <[hidden email]> wrote:
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia


Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Malgorzata Kudelska

Hi,
So is there any possibility to utilize an extra node that joins the cluster or will it remain idle?
What if I use a custom key function that matches the key variable to a number of keys bigger than the initial number of nodes (following the idea from your link)?
What about running flink on yarn, would that solve anything?

Cheers,
Gosia

25 maj 2016 22:54 "Aljoscha Krettek" <[hidden email]> napisał(a):
Hi,
first question: are you manually keying by "userId % numberOfPartitions"? Flink internally does roughly "key.hash() % numPartitions" so it is enough to specify the userId as your key.

Now, for you questions:

1. What Flink guarantees is that the state for a key k is always available when an element with key k is being processed. Internally, this means that elements with the same key will be processed by the same partition, though there would be other ways of achieving those guarantees.

2. Right now, when a node disappears the job will fail. Then recovery will kick in and restore from the latest checkpoint on a (possibly) new set of nodes. The system will make sure that the partitions and the state are correctly matched.

3. Also answered by the above, I hope at least :-)

4. This does currently not work but the ongoing work in this is tracked by https://issues.apache.org/jira/browse/FLINK-3755.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 21:09 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
I have the following situation.
- a keyed stream with a key defined as: userId % numberOfPartitions
- a custom flatMap transformation where I use a StateValue variable to keep the state of some calculations for each userId
- my questions are:
1. Does flink guarantee that the users with a given key will be always processed by the same partition assuming that the number of nodes is constant?
2. What will happen when one node disapears or a new one joins?  How will flink redistribute the users that were processed by the one that disapeared?
3. Will flink restore the state variables of these users from the last checkpoint and redistribute them to the new processing nodes?
4. How will flink redistribute the worload when a new node joins?

Cheers,
Gosia

Hi,
right now, this does not work but we're is also actively working on that. This is the design doc for part one of the necessary changes: https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

Cheers,
Aljoscha

On Wed, 25 May 2016 at 13:32 Malgorzata Kudelska <[hidden email]> wrote:
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia


Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Aljoscha Krettek
Hi,
newly added nodes would sit idle, yes. Only when we finish the rescaling work mentioned in the link will we be able to dynamically adapt.

The internal implementation of this will in fact hash keys to a larger number of partitions than the number of individual partitions and use these "key groups" to allows scaling to differing numbers of partitions. Once this is in it will also work on Yarn. Right now, running on Yarn does not allow a job to dynamically pick up new computing resources.

Cheers,
Aljoscha

On Thu, 26 May 2016 at 15:50 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
So is there any possibility to utilize an extra node that joins the cluster or will it remain idle?
What if I use a custom key function that matches the key variable to a number of keys bigger than the initial number of nodes (following the idea from your link)?
What about running flink on yarn, would that solve anything?

Cheers,
Gosia

25 maj 2016 22:54 "Aljoscha Krettek" <[hidden email]> napisał(a):
Hi,
first question: are you manually keying by "userId % numberOfPartitions"? Flink internally does roughly "key.hash() % numPartitions" so it is enough to specify the userId as your key.

Now, for you questions:

1. What Flink guarantees is that the state for a key k is always available when an element with key k is being processed. Internally, this means that elements with the same key will be processed by the same partition, though there would be other ways of achieving those guarantees.

2. Right now, when a node disappears the job will fail. Then recovery will kick in and restore from the latest checkpoint on a (possibly) new set of nodes. The system will make sure that the partitions and the state are correctly matched.

3. Also answered by the above, I hope at least :-)

4. This does currently not work but the ongoing work in this is tracked by https://issues.apache.org/jira/browse/FLINK-3755.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 21:09 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
I have the following situation.
- a keyed stream with a key defined as: userId % numberOfPartitions
- a custom flatMap transformation where I use a StateValue variable to keep the state of some calculations for each userId
- my questions are:
1. Does flink guarantee that the users with a given key will be always processed by the same partition assuming that the number of nodes is constant?
2. What will happen when one node disapears or a new one joins?  How will flink redistribute the users that were processed by the one that disapeared?
3. Will flink restore the state variables of these users from the last checkpoint and redistribute them to the new processing nodes?
4. How will flink redistribute the worload when a new node joins?

Cheers,
Gosia

Hi,
right now, this does not work but we're is also actively working on that. This is the design doc for part one of the necessary changes: https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

Cheers,
Aljoscha

On Wed, 25 May 2016 at 13:32 Malgorzata Kudelska <[hidden email]> wrote:
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia


Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Malgorzata Kudelska

Hi,
If I specify the userId as the key variable as you suggested, will the state variables be kept for every observed value of the key? I have a situation where I have a lot of userIds and many of them occure just once, so I don't want to keep the state for them for ever. I need the possibility to set a timeout to forget the data regarding users that don't produce any events for a certain amount if time. Is that possible with flink?
In order to add some custom information for every userId to the checkpointed state, do you suggest to make a ValueState variable for a stream keyed by userId. If yes, could you give some example?

Cheers,
Gosia

Hi,
newly added nodes would sit idle, yes. Only when we finish the rescaling work mentioned in the link will we be able to dynamically adapt.

The internal implementation of this will in fact hash keys to a larger number of partitions than the number of individual partitions and use these "key groups" to allows scaling to differing numbers of partitions. Once this is in it will also work on Yarn. Right now, running on Yarn does not allow a job to dynamically pick up new computing resources.

Cheers,
Aljoscha

On Thu, 26 May 2016 at 15:50 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
So is there any possibility to utilize an extra node that joins the cluster or will it remain idle?
What if I use a custom key function that matches the key variable to a number of keys bigger than the initial number of nodes (following the idea from your link)?
What about running flink on yarn, would that solve anything?

Cheers,
Gosia

25 maj 2016 22:54 "Aljoscha Krettek" <[hidden email]> napisał(a):
Hi,
first question: are you manually keying by "userId % numberOfPartitions"? Flink internally does roughly "key.hash() % numPartitions" so it is enough to specify the userId as your key.

Now, for you questions:

1. What Flink guarantees is that the state for a key k is always available when an element with key k is being processed. Internally, this means that elements with the same key will be processed by the same partition, though there would be other ways of achieving those guarantees.

2. Right now, when a node disappears the job will fail. Then recovery will kick in and restore from the latest checkpoint on a (possibly) new set of nodes. The system will make sure that the partitions and the state are correctly matched.

3. Also answered by the above, I hope at least :-)

4. This does currently not work but the ongoing work in this is tracked by https://issues.apache.org/jira/browse/FLINK-3755.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 21:09 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
I have the following situation.
- a keyed stream with a key defined as: userId % numberOfPartitions
- a custom flatMap transformation where I use a StateValue variable to keep the state of some calculations for each userId
- my questions are:
1. Does flink guarantee that the users with a given key will be always processed by the same partition assuming that the number of nodes is constant?
2. What will happen when one node disapears or a new one joins?  How will flink redistribute the users that were processed by the one that disapeared?
3. Will flink restore the state variables of these users from the last checkpoint and redistribute them to the new processing nodes?
4. How will flink redistribute the worload when a new node joins?

Cheers,
Gosia

Hi,
right now, this does not work but we're is also actively working on that. This is the design doc for part one of the necessary changes: https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

Cheers,
Aljoscha

On Wed, 25 May 2016 at 13:32 Malgorzata Kudelska <[hidden email]> wrote:
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia


Reply | Threaded
Open this post in threaded view
|

Re: Incremental updates

Aljoscha Krettek
Hi,
the state will be kept indefinitely but we are planning to introduce a setting that would allow setting a time-to-live on state. I think this is exactly what you would need. As an alternative, maybe you could implement your program using windows? In this way you would also bound how long state is kept.

As for using ValueState, here's a short example:

DataStream<MyType> stream = ...;
KeyedStream<MyType> keyedStream = stream.keyBy("id");

keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {

    private ValueState<Long> count;

    public void open(Configuration cfg) {
        state = getRuntimeContext().getState(
                new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
    }

    public Tuple2<MyType, Long> map(MyType value) {
        long count = state.value() + 1;
        state.update(value);
        return new Tuple2<>(value, count);
    }
});

Best,
Aljoscha

On Fri, 27 May 2016 at 18:59 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
If I specify the userId as the key variable as you suggested, will the state variables be kept for every observed value of the key? I have a situation where I have a lot of userIds and many of them occure just once, so I don't want to keep the state for them for ever. I need the possibility to set a timeout to forget the data regarding users that don't produce any events for a certain amount if time. Is that possible with flink?
In order to add some custom information for every userId to the checkpointed state, do you suggest to make a ValueState variable for a stream keyed by userId. If yes, could you give some example?

Cheers,
Gosia

Hi,
newly added nodes would sit idle, yes. Only when we finish the rescaling work mentioned in the link will we be able to dynamically adapt.

The internal implementation of this will in fact hash keys to a larger number of partitions than the number of individual partitions and use these "key groups" to allows scaling to differing numbers of partitions. Once this is in it will also work on Yarn. Right now, running on Yarn does not allow a job to dynamically pick up new computing resources.

Cheers,
Aljoscha

On Thu, 26 May 2016 at 15:50 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
So is there any possibility to utilize an extra node that joins the cluster or will it remain idle?
What if I use a custom key function that matches the key variable to a number of keys bigger than the initial number of nodes (following the idea from your link)?
What about running flink on yarn, would that solve anything?

Cheers,
Gosia

25 maj 2016 22:54 "Aljoscha Krettek" <[hidden email]> napisał(a):
Hi,
first question: are you manually keying by "userId % numberOfPartitions"? Flink internally does roughly "key.hash() % numPartitions" so it is enough to specify the userId as your key.

Now, for you questions:

1. What Flink guarantees is that the state for a key k is always available when an element with key k is being processed. Internally, this means that elements with the same key will be processed by the same partition, though there would be other ways of achieving those guarantees.

2. Right now, when a node disappears the job will fail. Then recovery will kick in and restore from the latest checkpoint on a (possibly) new set of nodes. The system will make sure that the partitions and the state are correctly matched.

3. Also answered by the above, I hope at least :-)

4. This does currently not work but the ongoing work in this is tracked by https://issues.apache.org/jira/browse/FLINK-3755.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 21:09 Malgorzata Kudelska <[hidden email]> wrote:

Hi,
I have the following situation.
- a keyed stream with a key defined as: userId % numberOfPartitions
- a custom flatMap transformation where I use a StateValue variable to keep the state of some calculations for each userId
- my questions are:
1. Does flink guarantee that the users with a given key will be always processed by the same partition assuming that the number of nodes is constant?
2. What will happen when one node disapears or a new one joins?  How will flink redistribute the users that were processed by the one that disapeared?
3. Will flink restore the state variables of these users from the last checkpoint and redistribute them to the new processing nodes?
4. How will flink redistribute the worload when a new node joins?

Cheers,
Gosia

Hi,
right now, this does not work but we're is also actively working on that. This is the design doc for part one of the necessary changes: https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

Cheers,
Aljoscha

On Wed, 25 May 2016 at 13:32 Malgorzata Kudelska <[hidden email]> wrote:
Hi,
Thanks for your reply.

Is Flink able to detect that an additional server joined and rebalance the processing? How is it done if I have a keyed stream and some custom ValueState variables?

Cheers, 
Gosia

2016-05-25 11:32 GMT+02:00 Aljoscha Krettek <[hidden email]>:
Hi Gosia,
right now, Flink is not doing incremental checkpoints. Every checkpoint is fully valid in isolation. Incremental checkpointing came up several times on ML discussions and we a planning to work on it once someone finds some free time.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 09:29 Rubén Casado <[hidden email]> wrote:
Hi Gosia

You can have a look to the PROTEUS project we are doing [1]. We are implementing incremental version of analytics operations. For example you can see in [2] the implementation of the incremental AVG. Maybe the code can give you some ideas :-)


[1] https://github.com/proteus-h2020/proteus-backend/tree/development
[2] https://github.com/proteus-h2020/proteus-backend/blob/development/src/main/java/com/treelogic/proteus/flink/incops/IncrementalAverage.java

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

<a href="tel:%2B34%20902%20286%20386" value="+34902286386" target="_blank">+34 902 286 386 - <a href="tel:%2B34%20607%2018%2028%2006" value="+34607182806" target="_blank">+34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Malgorzata Kudelska" <[hidden email]>
Para: [hidden email]
Enviados: Martes, 24 de Mayo 2016 22:01:28 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Incremental updates


Hi,
I have the following question. Does Flink support incremental updates?

In particular, I have a custom StateValue object and during the checkpoints I would like to save only the fields that changed since the previous checkpoint. Is that possible?

Regards,
Gosia