Add control mode for flink

classic Classic list List threaded Threaded
21 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Add control mode for flink

liujiangang
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.

Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Steven Wu
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.

Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Peter Huang
I agree with Steven. This logic can be added in a dynamic config framework that can bind into Flink operators. We probably don't need to let Flink runtime handle it.

On Fri, Jun 4, 2021 at 8:11 AM Steven Wu <[hidden email]> wrote:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.

Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

liujiangang
In reply to this post by Steven Wu
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.

Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Xintong Song
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.

Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Jark Wu-3
Thanks Xintong for the summary, 

I'm big +1 for this feature. 

Xintong's summary for Table/SQL's needs is correct. 
The "custom (broadcast) event" feature is important to us 
and even blocks further awesome features and optimizations in Table/SQL. 
I also discussed offline with [hidden email] several times for this topic, 
and we all agreed this is a reasonable feature but may need some careful design. 

Best,
Jark


On Mon, 7 Jun 2021 at 14:52, Xintong Song <[hidden email]> wrote:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.

Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

liujiangang
In reply to this post by Xintong Song
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML
Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

JING ZHANG
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML
Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

kai wang


I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML
Reply | Threaded
Open this post in threaded view
|

Re: Re: Add control mode for flink

Yun Gao
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems to be
a common buidling block for many functionalities and dynamic configuration framework
is a representative application that frequently required by users. Regarding the control flow, 
currently we are also considering the design of iteration for the flink-ml, and as Xintong has pointed
out, it also required the control flow in cases like detection global termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to detect if there are still 
records reside in the iteration body). And regarding  whether to implement the dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a point to consider, we 
might consider if we need to ensure every operator could receive the dynamic configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<[hidden email]>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<[hidden email]>
Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User Mailing List archive.]<[hidden email]>; user<[hidden email]>; dev<[hidden email]>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Re: Add control mode for flink

Xintong Song
+1 on separating the effort into two steps:
  1. Introduce a common control flow framework, with flexible interfaces for generating / reacting to control messages for various purposes.
  2. Features that leverating the control flow can be worked on concurrently
Meantime, keeping collecting potential features that may leverage the control flow should be helpful. It provides good inputs for the control flow framework design, to make the framework common enough to cover the potential use cases.

My suggestions on the next steps:
  1. Allow more time for opinions to be heard and potential use cases to be collected
  2. Draft a FLIP with the scope of common control flow framework
  3. We probably need a poc implementation to make sure the framework covers at least the following scenarios
    1. Produce control events from arbitrary operators
    2. Produce control events from JobMaster
    3. Consume control events from arbitrary operators downstream where the events are produced

Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems to be
a common buidling block for many functionalities and dynamic configuration framework
is a representative application that frequently required by users. Regarding the control flow, 
currently we are also considering the design of iteration for the flink-ml, and as Xintong has pointed
out, it also required the control flow in cases like detection global termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to detect if there are still 
records reside in the iteration body). And regarding  whether to implement the dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a point to consider, we 
might consider if we need to ensure every operator could receive the dynamic configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<[hidden email]>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<[hidden email]>
Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User Mailing List archive.]<[hidden email]>; user<[hidden email]>; dev<[hidden email]>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Re: Add control mode for flink

Steven Wu

I can see the benefits of control flow. E.g., it might help the old (and inactive) FLIP-17 side input. I would suggest that we add more details of some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic config is typically targeted/loaded by one specific operator. Control flow will propagate the dynamic config to all operators. not a problem per se 

Regarding using the REST api (to jobmanager) for accepting control signals from external system, where are we going to persist/checkpoint the signal? jobmanager can die before the control signal is propagated and checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]> wrote:
+1 on separating the effort into two steps:
  1. Introduce a common control flow framework, with flexible interfaces for generating / reacting to control messages for various purposes.
  2. Features that leverating the control flow can be worked on concurrently
Meantime, keeping collecting potential features that may leverage the control flow should be helpful. It provides good inputs for the control flow framework design, to make the framework common enough to cover the potential use cases.

My suggestions on the next steps:
  1. Allow more time for opinions to be heard and potential use cases to be collected
  2. Draft a FLIP with the scope of common control flow framework
  3. We probably need a poc implementation to make sure the framework covers at least the following scenarios
    1. Produce control events from arbitrary operators
    2. Produce control events from JobMaster
    3. Consume control events from arbitrary operators downstream where the events are produced

Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems to be
a common buidling block for many functionalities and dynamic configuration framework
is a representative application that frequently required by users. Regarding the control flow, 
currently we are also considering the design of iteration for the flink-ml, and as Xintong has pointed
out, it also required the control flow in cases like detection global termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to detect if there are still 
records reside in the iteration body). And regarding  whether to implement the dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a point to consider, we 
might consider if we need to ensure every operator could receive the dynamic configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<[hidden email]>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<[hidden email]>
Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User Mailing List archive.]<[hidden email]>; user<[hidden email]>; dev<[hidden email]>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Re: Add control mode for flink

liujiangang
Thanks for the reply. It is a good question. There are multi choices as follows:
  1. We can persist control signals in HighAvailabilityServices and replay them after failover.
  2. Only tell the users that the control signals take effect after they are checkpointed. 

Steven Wu [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月8日周二 下午2:15写道:

I can see the benefits of control flow. E.g., it might help the old (and inactive) FLIP-17 side input. I would suggest that we add more details of some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic config is typically targeted/loaded by one specific operator. Control flow will propagate the dynamic config to all operators. not a problem per se 

Regarding using the REST api (to jobmanager) for accepting control signals from external system, where are we going to persist/checkpoint the signal? jobmanager can die before the control signal is propagated and checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]> wrote:
+1 on separating the effort into two steps:
  1. Introduce a common control flow framework, with flexible interfaces for generating / reacting to control messages for various purposes.
  2. Features that leverating the control flow can be worked on concurrently
Meantime, keeping collecting potential features that may leverage the control flow should be helpful. It provides good inputs for the control flow framework design, to make the framework common enough to cover the potential use cases.

My suggestions on the next steps:
  1. Allow more time for opinions to be heard and potential use cases to be collected
  2. Draft a FLIP with the scope of common control flow framework
  3. We probably need a poc implementation to make sure the framework covers at least the following scenarios
    1. Produce control events from arbitrary operators
    2. Produce control events from JobMaster
    3. Consume control events from arbitrary operators downstream where the events are produced

Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems to be
a common buidling block for many functionalities and dynamic configuration framework
is a representative application that frequently required by users. Regarding the control flow, 
currently we are also considering the design of iteration for the flink-ml, and as Xintong has pointed
out, it also required the control flow in cases like detection global termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to detect if there are still 
records reside in the iteration body). And regarding  whether to implement the dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a point to consider, we 
might consider if we need to ensure every operator could receive the dynamic configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<[hidden email]>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<[hidden email]>
Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User Mailing List archive.]<[hidden email]>; user<[hidden email]>; dev<[hidden email]>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,

      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:

  1. Change data processing’ logic, such as filter condition.

  2. Send trigger events to make the progress forward.

  3. Define some tools to degrade the job, such as limit input qps, sampling data.

  4. Change log level to debug current problem.

      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.


We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44278.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML
Reply | Threaded
Open this post in threaded view
|

Re: Re: Add control mode for flink

Steven Wu
option 2 is probably not feasible, as checkpoint may take a long time or may fail.

Option 1 might work, although it complicates the job recovery and checkpoint. After checkpoint completion, we need to clean up those control signals stored in HA service.

On Tue, Jun 8, 2021 at 1:14 AM 刘建刚 <[hidden email]> wrote:
Thanks for the reply. It is a good question. There are multi choices as
follows:

   1. We can persist control signals in HighAvailabilityServices and replay
   them after failover.
   2. Only tell the users that the control signals take effect after they
   are checkpointed.


Steven Wu [via Apache Flink User Mailing List archive.] <
[hidden email]> 于2021年6月8日周二 下午2:15写道:

>
> I can see the benefits of control flow. E.g., it might help the old (and
> inactive) FLIP-17 side input. I would suggest that we add more details of
> some of the potential use cases.
>
> Here is one mismatch with using control flow for dynamic config. Dynamic
> config is typically targeted/loaded by one specific operator. Control flow
> will propagate the dynamic config to all operators. not a problem per se
>
> Regarding using the REST api (to jobmanager) for accepting control
> signals from external system, where are we going to persist/checkpoint the
> signal? jobmanager can die before the control signal is propagated and
> checkpointed. Did we lose the control signal in this case?
>
>
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=44278&i=0>> wrote:
>
>> +1 on separating the effort into two steps:
>>
>>    1. Introduce a common control flow framework, with flexible
>>    interfaces for generating / reacting to control messages for various
>>    purposes.
>>    2. Features that leverating the control flow can be worked on
>>    concurrently
>>
>> Meantime, keeping collecting potential features that may leverage the
>> control flow should be helpful. It provides good inputs for the control
>> flow framework design, to make the framework common enough to cover the
>> potential use cases.
>>
>> My suggestions on the next steps:
>>
>>    1. Allow more time for opinions to be heard and potential use cases
>>    to be collected
>>    2. Draft a FLIP with the scope of common control flow framework
>>    3. We probably need a poc implementation to make sure the framework
>>    covers at least the following scenarios
>>       1. Produce control events from arbitrary operators
>>       2. Produce control events from JobMaster
>>       3. Consume control events from arbitrary operators downstream
>>       where the events are produced
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=44278&i=1>> wrote:
>>
>>> Very thanks Jiangang for bringing this up and very thanks for the
>>> discussion!
>>>
>>> I also agree with the summarization by Xintong and Jing that control
>>> flow seems to be
>>> a common buidling block for many functionalities and dynamic
>>> configuration framework
>>> is a representative application that frequently required by users.
>>> Regarding the control flow,
>>> currently we are also considering the design of iteration for the
>>> flink-ml, and as Xintong has pointed
>>> out, it also required the control flow in cases like detection global
>>> termination inside the iteration
>>>  (in this case we need to broadcast an event through the iteration body
>>> to detect if there are still
>>> records reside in the iteration body). And regarding  whether to
>>> implement the dynamic configuration
>>> framework, I also agree with Xintong that the consistency guarantee
>>> would be a point to consider, we
>>> might consider if we need to ensure every operator could receive the
>>> dynamic configuration.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> ------------------------------------------------------------------
>>> Sender:kai wang<[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=2>>
>>> Date:2021/06/08 11:52:12
>>> Recipient:JING ZHANG<[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=3>>
>>> Cc:刘建刚<[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=4>>; Xintong Song
>>> [via Apache Flink User Mailing List archive.]<[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=5>>; user<[hidden
>>> email] <http:///user/SendEmail.jtp?type=node&node=44278&i=6>>; dev<[hidden
>>> email] <http:///user/SendEmail.jtp?type=node&node=44278&i=7>>
>>> Theme:Re: Add control mode for flink
>>>
>>>
>>>
>>> I'm big +1 for this feature.
>>>
>>>    1. Limit the input qps.
>>>    2. Change log level for debug.
>>>
>>> in my team, the two examples above are needed
>>>
>>> JING ZHANG <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=8>> 于2021年6月8日周二
>>> 上午11:18写道:
>>>
>>>> Thanks Jiangang for bringing this up.
>>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>>> provides many useful functions in Kuaishou, because it could update job
>>>> behavior without relaunching the job. The functions are very popular in
>>>> Kuaishou, we also see similar demands in maillist [1].
>>>>
>>>> I'm big +1 for this feature.
>>>>
>>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>>> idea about introducing control mode in Flink.
>>>> It takes the original issue a big step closer to essence which also
>>>> provides the possibility for more fantastic features as mentioned in
>>>> Xintong and Jark's response.
>>>> Based on the idea, there are at least two milestones to achieve the
>>>> goals which were proposed by Jiangang:
>>>> (1) Build a common control flow framework in Flink.
>>>>      It focuses on control flow propagation. And, how to integrate the
>>>> common control flow framework with existing mechanisms.
>>>> (2) Builds a dynamic configuration framework which is exposed to users
>>>> directly.
>>>>      We could see dynamic configuration framework is a top application
>>>> on the underlying control flow framework.
>>>>      It focuses on the Public API which receives configuration updating
>>>> requests from users. Besides, it is necessary to introduce an API
>>>> protection mechanism to avoid job performance degradation caused by too
>>>> many control events.
>>>>
>>>> I suggest splitting the whole design into two after we reach a
>>>> consensus on whether to introduce this feature because these two sub-topic
>>>> all need careful design.
>>>>
>>>>
>>>> [
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>>>> ]
>>>>
>>>> Best regards,
>>>> JING ZHANG
>>>>
>>>> 刘建刚 <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=9>> 于2021年6月8日周二
>>>> 上午10:01写道:
>>>>
>>>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>>>> long-running, it is similar to many services. So interacting with it or
>>>>> controlling it is a common desire. This was our initial thought when
>>>>> implementing the feature. In our inner flink, many configs used in yaml can
>>>>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>>>>
>>>>>    1. Limit the input qps.
>>>>>    2. Degrade the job by sampling and so on.
>>>>>    3. Reset kafka offset in certain cases.
>>>>>    4. Stop checkpoint in certain cases.
>>>>>    5. Control the history consuming.
>>>>>    6. Change log level for debug.
>>>>>
>>>>>
>>>>> After deep discussion, we realize that a common control flow
>>>>> will benefit both users and developers. Dynamic config is just one of the
>>>>> use cases. For the concrete design and implementation, it relates with many
>>>>> components, like jobmaster, network channel, operators and so on, which
>>>>> needs deeper consideration and design.
>>>>>
>>>>> Xintong Song [via Apache Flink User Mailing List archive.] <[hidden
>>>>> email] <http:///user/SendEmail.jtp?type=node&node=44278&i=10>>
>>>>> 于2021年6月7日周一 下午2:52写道:
>>>>>
>>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>>>>>> feedback.
>>>>>>
>>>>>> I was part of the preliminary offline discussions before this
>>>>>> proposal went public. So maybe I can help clarify things a bit.
>>>>>>
>>>>>> In short, despite the phrase "control mode" might be a bit
>>>>>> misleading, what we truly want to do from my side is to make the concept of
>>>>>> "control flow" explicit and expose it to users.
>>>>>>
>>>>>> ## Background
>>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version of
>>>>>> Flink. One of their custom features is allowing dynamically changing
>>>>>> operator behaviors via the REST APIs. He's willing to contribute this
>>>>>> feature to the community, and came to Yun Gao and me for suggestions. After
>>>>>> discussion, we feel that the underlying question to be answered is how do
>>>>>> we model the control flow in Flink. Dynamically controlling jobs via REST
>>>>>> API can be one of the features built on top of the control flow, and there
>>>>>> could be others.
>>>>>>
>>>>>> ## Control flow
>>>>>> Control flow refers to the communication channels for sending
>>>>>> events/signals to/between tasks/operators, that changes Flink's behavior in
>>>>>> a way that may or may not affect the computation logic. Typical control
>>>>>> events/signals Flink currently has are watermarks and checkpoint barriers.
>>>>>>
>>>>>> In general, for modeling control flow, the following questions should
>>>>>> be considered.
>>>>>> 1. Who (which component) is responsible for generating the control
>>>>>> messages?
>>>>>> 2. Who (which component) is responsible for reacting to the messages.
>>>>>> 3. How do the messages propagate?
>>>>>> 4. When it comes to affecting the computation logics, how should the
>>>>>> control flow work together with the exact-once consistency.
>>>>>>
>>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably
>>>>>> share many things in common. A unified control flow model would help
>>>>>> deduplicate the common logics, allowing us to focus on the use case
>>>>>> specific parts.
>>>>>>
>>>>>> E.g.,
>>>>>> - Watermarks: generated by source operators, handled by window
>>>>>> operators.
>>>>>> - Checkpoint barrier: generated by the checkpoint coordinator,
>>>>>> handled by all tasks
>>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the
>>>>>> REST command), handled by specific operators/UDFs
>>>>>> - Operator defined events: The following features are still in
>>>>>> planning, but may potentially benefit from the control flow model. (Please
>>>>>> correct me if I'm wrong, @Yun, @Jark)
>>>>>>   * Iteration: When a certain condition is met, we might want to
>>>>>> signal downstream operators with an event
>>>>>>   * Mini-batch assembling: Flink currently uses special watermarks
>>>>>> for indicating the end of each mini-batch, which makes it tricky to deal
>>>>>> with event time related computations.
>>>>>>   * Hive dimension table join: For periodically reloaded hive tables,
>>>>>> it would be helpful to have specific events signaling that a reloading is
>>>>>> finished.
>>>>>>   * Bootstrap dimension table join: This is similar to the previous
>>>>>> one. In cases where we want to fully load the dimension table before
>>>>>> starting joining the mainstream, it would be helpful to have an event
>>>>>> signaling the finishing of the bootstrap.
>>>>>>
>>>>>> ## Dynamic REST controlling
>>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>>> think it's quite convenient. Currently, to dynamically change the behavior
>>>>>> of an operator, we need to set up a separate source for the control events
>>>>>> and leverage broadcast state. Being able to send the events via REST APIs
>>>>>> definitely improves the usability.
>>>>>>
>>>>>> Leveraging dynamic configuration frameworks is for sure one possible
>>>>>> approach. The reason we are in favor of introducing the control flow is
>>>>>> that:
>>>>>> - It benefits not only this specific dynamic controlling feature, but
>>>>>> potentially other future features as well.
>>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
>>>>>> framework work together with Flink's consistency mechanism.
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=0>> wrote:
>>>>>>
>>>>>>> Thank you for the reply. I have checked the post you mentioned. The
>>>>>>> dynamic config may be useful sometimes. But it is hard to keep data
>>>>>>> consistent in flink, for example, what if the dynamic config will take
>>>>>>> effect when failover. Since dynamic config is a desire for users, maybe
>>>>>>> flink can support it in some way.
>>>>>>>
>>>>>>> For the control mode, dynamic config is just one of the control
>>>>>>> modes. In the google doc, I have list some other cases. For example,
>>>>>>> control events are generated in operators or external services. Besides
>>>>>>> user's dynamic config, flink system can support some common dynamic
>>>>>>> configuration, like qps limit, checkpoint control and so on.
>>>>>>>
>>>>>>> It needs good design to handle the control mode structure. Based on
>>>>>>> that, other control features can be added easily later, like changing log
>>>>>>> level when job is running. In the end, flink will not just process data,
>>>>>>> but also interact with users to receive control events like a service.
>>>>>>>
>>>>>>> Steven Wu <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五
>>>>>>> 下午11:11写道:
>>>>>>>
>>>>>>>> I am not sure if we should solve this problem in Flink. This is
>>>>>>>> more like a dynamic config problem that probably should be solved by some
>>>>>>>> configuration framework. Here is one post from google search:
>>>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>>>>>>>>
>>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=2>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>>       Flink jobs are always long-running. When the job is running,
>>>>>>>>> users may want to control the job but not stop it. The control reasons can
>>>>>>>>> be different as following:
>>>>>>>>>
>>>>>>>>>    1.
>>>>>>>>>
>>>>>>>>>    Change data processing’ logic, such as filter condition.
>>>>>>>>>    2.
>>>>>>>>>
>>>>>>>>>    Send trigger events to make the progress forward.
>>>>>>>>>    3.
>>>>>>>>>
>>>>>>>>>    Define some tools to degrade the job, such as limit input qps,
>>>>>>>>>    sampling data.
>>>>>>>>>    4.
>>>>>>>>>
>>>>>>>>>    Change log level to debug current problem.
>>>>>>>>>
>>>>>>>>>       The common way to do this is to stop the job, do
>>>>>>>>> modifications and start the job. It may take a long time to recover. In
>>>>>>>>> some situations, stopping jobs is intolerable, for example, the job is
>>>>>>>>> related to money or important activities.So we need some
>>>>>>>>> technologies to control the running job without stopping the job.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We propose to add control mode for flink. A control mode based on
>>>>>>>>> the restful interface is first introduced. It works by these steps:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    1. The user can predefine some logic which supports config
>>>>>>>>>    control, such as filter condition.
>>>>>>>>>    2. Run the job.
>>>>>>>>>    3. If the user wants to change the job's running logic, just
>>>>>>>>>    send a restful request with the responding config.
>>>>>>>>>
>>>>>>>>> Other control modes will also be considered in the future. More
>>>>>>>>> introduction can refer to the doc
>>>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>>>>>>>>> . If the community likes the proposal, more discussion is needed and a more
>>>>>>>>> detailed design will be given later. Any suggestions and ideas are welcome.
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> If you reply to this email, your message will be added to the
>>>>>> discussion below:
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=11>
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>
>>>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44278.html
> To start a new topic under Apache Flink User Mailing List archive., email
> [hidden email]
> To unsubscribe from Apache Flink User Mailing List archive., click here
> < > .
> NAML
> <
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Paul Lam
In reply to this post by Steven Wu
+1 for this feature. Setting up a separate control stream is too much for many use cases, it would very helpful if users can leverage the built-in control flow of Flink.

My 2 cents:
1. @Steven IMHO, producing control events from JobMaster is similar to triggering a savepoint. The REST api is non-blocking, and users should poll the results to confirm the operation is succeeded. If something goes wrong, it’s user’s responsibility to retry.
2. There are two kinds of existing special elements, special stream records (e.g. watermarks) and events (e.g. checkpoint barrier). They all flow through the whole DAG, but events needs to be acknowledged by downstream and can overtake records, while stream records are not). So I’m wondering if we plan to unify the two approaches in the new control flow (as Xintong mentioned both in the previous mails)?

Best,
Paul Lam

2021年6月8日 14:08,Steven Wu <[hidden email]> 写道:


I can see the benefits of control flow. E.g., it might help the old (and inactive) FLIP-17 side input. I would suggest that we add more details of some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic config is typically targeted/loaded by one specific operator. Control flow will propagate the dynamic config to all operators. not a problem per se 

Regarding using the REST api (to jobmanager) for accepting control signals from external system, where are we going to persist/checkpoint the signal? jobmanager can die before the control signal is propagated and checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]> wrote:
+1 on separating the effort into two steps:
  1. Introduce a common control flow framework, with flexible interfaces for generating / reacting to control messages for various purposes.
  2. Features that leverating the control flow can be worked on concurrently
Meantime, keeping collecting potential features that may leverage the control flow should be helpful. It provides good inputs for the control flow framework design, to make the framework common enough to cover the potential use cases.

My suggestions on the next steps:
  1. Allow more time for opinions to be heard and potential use cases to be collected
  2. Draft a FLIP with the scope of common control flow framework
  3. We probably need a poc implementation to make sure the framework covers at least the following scenarios
    1. Produce control events from arbitrary operators
    2. Produce control events from JobMaster
    3. Consume control events from arbitrary operators downstream where the events are produced

Thank you~
Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems to be
a common buidling block for many functionalities and dynamic configuration framework
is a representative application that frequently required by users. Regarding the control flow, 
currently we are also considering the design of iteration for the flink-ml, and as Xintong has pointed
out, it also required the control flow in cases like detection global termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to detect if there are still 
records reside in the iteration body). And regarding  whether to implement the dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a point to consider, we 
might consider if we need to ensure every operator could receive the dynamic configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<[hidden email]>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<[hidden email]>
Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User Mailing List archive.]<[hidden email]>; user<[hidden email]>; dev<[hidden email]>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~
Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,
      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:
  1. Change data processing’ logic, such as filter condition.
  2. Send trigger events to make the progress forward.
  3. Define some tools to degrade the job, such as limit input qps, sampling data.
  4. Change log level to debug current problem.
      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.

We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML


Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Steven Wu
> producing control events from JobMaster is similar to triggering a savepoint.

Paul, here is what I see the difference. Upon job or jobmanager recovery, we don't need to recover and replay the savepoint trigger signal.

On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <[hidden email]> wrote:
+1 for this feature. Setting up a separate control stream is too much for many use cases, it would very helpful if users can leverage the built-in control flow of Flink.

My 2 cents:
1. @Steven IMHO, producing control events from JobMaster is similar to triggering a savepoint. The REST api is non-blocking, and users should poll the results to confirm the operation is succeeded. If something goes wrong, it’s user’s responsibility to retry.
2. There are two kinds of existing special elements, special stream records (e.g. watermarks) and events (e.g. checkpoint barrier). They all flow through the whole DAG, but events needs to be acknowledged by downstream and can overtake records, while stream records are not). So I’m wondering if we plan to unify the two approaches in the new control flow (as Xintong mentioned both in the previous mails)?

Best,
Paul Lam

2021年6月8日 14:08,Steven Wu <[hidden email]> 写道:


I can see the benefits of control flow. E.g., it might help the old (and inactive) FLIP-17 side input. I would suggest that we add more details of some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic config is typically targeted/loaded by one specific operator. Control flow will propagate the dynamic config to all operators. not a problem per se 

Regarding using the REST api (to jobmanager) for accepting control signals from external system, where are we going to persist/checkpoint the signal? jobmanager can die before the control signal is propagated and checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]> wrote:
+1 on separating the effort into two steps:
  1. Introduce a common control flow framework, with flexible interfaces for generating / reacting to control messages for various purposes.
  2. Features that leverating the control flow can be worked on concurrently
Meantime, keeping collecting potential features that may leverage the control flow should be helpful. It provides good inputs for the control flow framework design, to make the framework common enough to cover the potential use cases.

My suggestions on the next steps:
  1. Allow more time for opinions to be heard and potential use cases to be collected
  2. Draft a FLIP with the scope of common control flow framework
  3. We probably need a poc implementation to make sure the framework covers at least the following scenarios
    1. Produce control events from arbitrary operators
    2. Produce control events from JobMaster
    3. Consume control events from arbitrary operators downstream where the events are produced

Thank you~
Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems to be
a common buidling block for many functionalities and dynamic configuration framework
is a representative application that frequently required by users. Regarding the control flow, 
currently we are also considering the design of iteration for the flink-ml, and as Xintong has pointed
out, it also required the control flow in cases like detection global termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to detect if there are still 
records reside in the iteration body). And regarding  whether to implement the dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a point to consider, we 
might consider if we need to ensure every operator could receive the dynamic configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<[hidden email]>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<[hidden email]>
Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User Mailing List archive.]<[hidden email]>; user<[hidden email]>; dev<[hidden email]>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~
Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,
      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:
  1. Change data processing’ logic, such as filter condition.
  2. Send trigger events to make the progress forward.
  3. Define some tools to degrade the job, such as limit input qps, sampling data.
  4. Change log level to debug current problem.
      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.

We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML


Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Xintong Song
2. There are two kinds of existing special elements, special stream records (e.g. watermarks) and events (e.g. checkpoint barrier). They all flow through the whole DAG, but events needs to be acknowledged by downstream and can overtake records, while stream records are not). So I’m wondering if we plan to unify the two approaches in the new control flow (as Xintong mentioned both in the previous mails)?

TBH, I don't really know yet. We feel that the control flow is a non-trivial topic and it would be better to bring it up publicly as early as possible, while the concrete plan is still on the way.

Personally, I'm leaning towards not touching the existing watermarks and checkpoint barriers in the first step.
- I'd expect the control flow to be introduced as an experimental feature that takes time to stabilize. It would be better that the existing important features like checkpointing and watermarks stay unaffected.
- Checkpoint barriers are a little different, as other control messages somehow rely on it to achieve exactly once consistency. Without the concrete design, I'm not entirely sure whether it can be properly modeled as a special case of general control messages.
- Watermarks are probably similar to the other control messages. However, it's already exposed to users as public APIs. If we want to migrate it to the new control flow, we'd be very careful not to break any compatibility.


Thank you~

Xintong Song



On Wed, Jun 9, 2021 at 11:30 AM Steven Wu <[hidden email]> wrote:
> producing control events from JobMaster is similar to triggering a savepoint.

Paul, here is what I see the difference. Upon job or jobmanager recovery, we don't need to recover and replay the savepoint trigger signal.

On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <[hidden email]> wrote:
+1 for this feature. Setting up a separate control stream is too much for many use cases, it would very helpful if users can leverage the built-in control flow of Flink.

My 2 cents:
1. @Steven IMHO, producing control events from JobMaster is similar to triggering a savepoint. The REST api is non-blocking, and users should poll the results to confirm the operation is succeeded. If something goes wrong, it’s user’s responsibility to retry.
2. There are two kinds of existing special elements, special stream records (e.g. watermarks) and events (e.g. checkpoint barrier). They all flow through the whole DAG, but events needs to be acknowledged by downstream and can overtake records, while stream records are not). So I’m wondering if we plan to unify the two approaches in the new control flow (as Xintong mentioned both in the previous mails)?

Best,
Paul Lam

2021年6月8日 14:08,Steven Wu <[hidden email]> 写道:


I can see the benefits of control flow. E.g., it might help the old (and inactive) FLIP-17 side input. I would suggest that we add more details of some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic config is typically targeted/loaded by one specific operator. Control flow will propagate the dynamic config to all operators. not a problem per se 

Regarding using the REST api (to jobmanager) for accepting control signals from external system, where are we going to persist/checkpoint the signal? jobmanager can die before the control signal is propagated and checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]> wrote:
+1 on separating the effort into two steps:
  1. Introduce a common control flow framework, with flexible interfaces for generating / reacting to control messages for various purposes.
  2. Features that leverating the control flow can be worked on concurrently
Meantime, keeping collecting potential features that may leverage the control flow should be helpful. It provides good inputs for the control flow framework design, to make the framework common enough to cover the potential use cases.

My suggestions on the next steps:
  1. Allow more time for opinions to be heard and potential use cases to be collected
  2. Draft a FLIP with the scope of common control flow framework
  3. We probably need a poc implementation to make sure the framework covers at least the following scenarios
    1. Produce control events from arbitrary operators
    2. Produce control events from JobMaster
    3. Consume control events from arbitrary operators downstream where the events are produced

Thank you~
Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems to be
a common buidling block for many functionalities and dynamic configuration framework
is a representative application that frequently required by users. Regarding the control flow, 
currently we are also considering the design of iteration for the flink-ml, and as Xintong has pointed
out, it also required the control flow in cases like detection global termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to detect if there are still 
records reside in the iteration body). And regarding  whether to implement the dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a point to consider, we 
might consider if we need to ensure every operator could receive the dynamic configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<[hidden email]>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<[hidden email]>
Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User Mailing List archive.]<[hidden email]>; user<[hidden email]>; dev<[hidden email]>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 
  1. Limit the input qps.
  2. Change log level for debug.
in my team, the two examples above are needed

JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides many useful functions in Kuaishou, because it could update job behavior without relaunching the job. The functions are very popular in Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides the possibility for more fantastic features as mentioned in Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals which were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users directly. 
     We could see dynamic configuration framework is a top application on the underlying control flow framework. 
     It focuses on the Public API which receives configuration updating requests from users. Besides, it is necessary to introduce an API protection mechanism to avoid job performance degradation caused by too many control events.

I suggest splitting the whole design into two after we reach a consensus on whether to introduce this feature because these two sub-topic all need careful design.



Best regards,
JING ZHANG

刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
Thanks Xintong Song for the detailed supplement. Since flink is long-running, it is similar to many services. So interacting with it or controlling it is a common desire. This was our initial thought when implementing the feature. In our inner flink, many configs used in yaml can be adjusted by dynamic to avoid restarting the job, for examples as follow:
  1. Limit the input qps.
  2. Degrade the job by sampling and so on.
  3. Reset kafka offset in certain cases.
  4. Stop checkpoint in certain cases.
  5. Control the history consuming.
  6. Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both users and developers. Dynamic config is just one of the use cases. For the concrete design and implementation, it relates with many components, like jobmaster, network channel, operators and so on, which needs deeper consideration and design. 

Xintong Song [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年6月7日周一 下午2:52写道:
Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we truly want to do from my side is to make the concept of "control flow" explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. One of their custom features is allowing dynamically changing operator behaviors via the REST APIs. He's willing to contribute this feature to the community, and came to Yun Gao and me for suggestions. After discussion, we feel that the underlying question to be answered is how do we model the control flow in Flink. Dynamically controlling jobs via REST API can be one of the features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals to/between tasks/operators, that changes Flink's behavior in a way that may or may not affect the computation logic. Typical control events/signals Flink currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many things in common. A unified control flow model would help deduplicate the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but may potentially benefit from the control flow model. (Please correct me if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for indicating the end of each mini-batch, which makes it tricky to deal with event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In cases where we want to fully load the dimension table before starting joining the mainstream, it would be helpful to have an event signaling the finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's quite convenient. Currently, to dynamically change the behavior of an operator, we need to set up a separate source for the control events and leverage broadcast state. Being able to send the events via REST APIs definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework work together with Flink's consistency mechanism.

Thank you~
Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way.

For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service.

Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:
Hi everyone,
      Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following:
  1. Change data processing’ logic, such as filter condition.
  2. Send trigger events to make the progress forward.
  3. Define some tools to degrade the job, such as limit input qps, sampling data.
  4. Change log level to debug current problem.
      The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job.

We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps:

  1. The user can predefine some logic which supports config control, such as filter condition.
  2. Run the job.
  3. If the user wants to change the job's running logic, just send a restful request with the responding config.
Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML


Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

liujiangang
    Thanks for all the discussions and suggestions. Since the topic has been discussed for about a week, it is time to have a conclusion and new ideas are welcomed at the same time.
    First, the topic starts with use cases in restful interface. The restful interface supported many useful interactions with users, for example as follows. It is an easy way to control the job compared with broadcast api.
  1. Change data processing’ logic by dynamic configs, such as filter condition.
  2. Define some tools to control the job, such as QPS limit, sampling, change log level and so on.
    Second, we broaden the topic to control flow in order to support all kinds of control events besides the above user cases. There is a strong demand to support custom (broadcast) events for iteration, SQL control events and so on. As Xintong Song said, the key to the control flow lies as follows:
  1. Who (which component) is responsible for generating the control messages? It may be the jobmaster by some ways, the inner operator and so on.
  2. Who (which component) is responsible for reacting to the messages. 
  3. How do the messages propagate? Flink should support sending control messages by channels.
  4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency.  To use the checkpoint mechanism, control messages flowing from source to down tasks may be a good idea.
    Third, a common and flexible control flow design requires good design and implementation as a base. Future features and existing features should both be considered. For future features, a common restful interface is first needed to support dynamic configs. For existing features, There exist checkpoint barriers, watermark and latency marker. They have some special behaviors but also share a lot in common. The common logic should be considered but maybe they should remain unchanged until the control flow is stable.
    Some other problems as follows:
  1. How to persist the control signals when the jobmaster fails? An idea is to persist control signals in HighAvailabilityServices and replay them after failover. The restful request should be non-blocking.
  2. Should all the operators receive the control messages? All operators should have the ability to receive upper operators' control messages but maybe not process them. If we want to persist the control message state, all the subtasks belonging to one operator should have the same control events in order to rescale easily.
    For the next step, I will draft a FLIP with the scope of common control flow framework. More discussions, ideas and problems are still welcome. 

Thank you~

Jiangang Liu








Xintong Song <[hidden email]> 于2021年6月9日周三 下午12:01写道:
>
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>

TBH, I don't really know yet. We feel that the control flow is a
non-trivial topic and it would be better to bring it up publicly as early
as possible, while the concrete plan is still on the way.

Personally, I'm leaning towards not touching the existing watermarks and
checkpoint barriers in the first step.
- I'd expect the control flow to be introduced as an experimental feature
that takes time to stabilize. It would be better that the existing
important features like checkpointing and watermarks stay unaffected.
- Checkpoint barriers are a little different, as other control messages
somehow rely on it to achieve exactly once consistency. Without the
concrete design, I'm not entirely sure whether it can be properly modeled
as a special case of general control messages.
- Watermarks are probably similar to the other control messages. However,
it's already exposed to users as public APIs. If we want to migrate it to
the new control flow, we'd be very careful not to break any compatibility.


Thank you~

Xintong Song



On Wed, Jun 9, 2021 at 11:30 AM Steven Wu <[hidden email]> wrote:

> > producing control events from JobMaster is similar to triggering a
> savepoint.
>
> Paul, here is what I see the difference. Upon job or jobmanager recovery,
> we don't need to recover and replay the savepoint trigger signal.
>
> On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <[hidden email]> wrote:
>
>> +1 for this feature. Setting up a separate control stream is too much for
>> many use cases, it would very helpful if users can leverage the built-in
>> control flow of Flink.
>>
>> My 2 cents:
>> 1. @Steven IMHO, producing control events from JobMaster is similar to
>> triggering a savepoint. The REST api is non-blocking, and users should poll
>> the results to confirm the operation is succeeded. If something goes wrong,
>> it’s user’s responsibility to retry.
>> 2. There are two kinds of existing special elements, special stream
>> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> flow through the whole DAG, but events needs to be acknowledged by
>> downstream and can overtake records, while stream records are not). So I’m
>> wondering if we plan to unify the two approaches in the new control flow
>> (as Xintong mentioned both in the previous mails)?
>>
>> Best,
>> Paul Lam
>>
>> 2021年6月8日 14:08,Steven Wu <[hidden email]> 写道:
>>
>>
>> I can see the benefits of control flow. E.g., it might help the old (and
>> inactive) FLIP-17 side input. I would suggest that we add more details of
>> some of the potential use cases.
>>
>> Here is one mismatch with using control flow for dynamic config. Dynamic
>> config is typically targeted/loaded by one specific operator. Control flow
>> will propagate the dynamic config to all operators. not a problem per se
>>
>> Regarding using the REST api (to jobmanager) for accepting control
>> signals from external system, where are we going to persist/checkpoint the
>> signal? jobmanager can die before the control signal is propagated and
>> checkpointed. Did we lose the control signal in this case?
>>
>>
>> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]>
>> wrote:
>>
>>> +1 on separating the effort into two steps:
>>>
>>>    1. Introduce a common control flow framework, with flexible
>>>    interfaces for generating / reacting to control messages for various
>>>    purposes.
>>>    2. Features that leverating the control flow can be worked on
>>>    concurrently
>>>
>>> Meantime, keeping collecting potential features that may leverage the
>>> control flow should be helpful. It provides good inputs for the control
>>> flow framework design, to make the framework common enough to cover the
>>> potential use cases.
>>>
>>> My suggestions on the next steps:
>>>
>>>    1. Allow more time for opinions to be heard and potential use cases
>>>    to be collected
>>>    2. Draft a FLIP with the scope of common control flow framework
>>>    3. We probably need a poc implementation to make sure the framework
>>>    covers at least the following scenarios
>>>       1. Produce control events from arbitrary operators
>>>       2. Produce control events from JobMaster
>>>       3. Consume control events from arbitrary operators downstream
>>>       where the events are produced
>>>
>>>
>>> Thank you~
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
>>>
>>>> Very thanks Jiangang for bringing this up and very thanks for the
>>>> discussion!
>>>>
>>>> I also agree with the summarization by Xintong and Jing that control
>>>> flow seems to be
>>>> a common buidling block for many functionalities and dynamic
>>>> configuration framework
>>>> is a representative application that frequently required by users.
>>>> Regarding the control flow,
>>>> currently we are also considering the design of iteration for the
>>>> flink-ml, and as Xintong has pointed
>>>> out, it also required the control flow in cases like detection global
>>>> termination inside the iteration
>>>>  (in this case we need to broadcast an event through the iteration
>>>> body to detect if there are still
>>>> records reside in the iteration body). And regarding  whether to
>>>> implement the dynamic configuration
>>>> framework, I also agree with Xintong that the consistency guarantee
>>>> would be a point to consider, we
>>>> might consider if we need to ensure every operator could receive the
>>>> dynamic configuration.
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>>
>>>>
>>>> ------------------------------------------------------------------
>>>> Sender:kai wang<[hidden email]>
>>>> Date:2021/06/08 11:52:12
>>>> Recipient:JING ZHANG<[hidden email]>
>>>> Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User
>>>> Mailing List archive.]<[hidden email]>; user<
>>>> [hidden email]>; dev<[hidden email]>
>>>> Theme:Re: Add control mode for flink
>>>>
>>>>
>>>>
>>>> I'm big +1 for this feature.
>>>>
>>>>    1. Limit the input qps.
>>>>    2. Change log level for debug.
>>>>
>>>> in my team, the two examples above are needed
>>>>
>>>> JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
>>>>
>>>>> Thanks Jiangang for bringing this up.
>>>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>>>> provides many useful functions in Kuaishou, because it could update job
>>>>> behavior without relaunching the job. The functions are very popular in
>>>>> Kuaishou, we also see similar demands in maillist [1].
>>>>>
>>>>> I'm big +1 for this feature.
>>>>>
>>>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>>>> idea about introducing control mode in Flink.
>>>>> It takes the original issue a big step closer to essence which also
>>>>> provides the possibility for more fantastic features as mentioned in
>>>>> Xintong and Jark's response.
>>>>> Based on the idea, there are at least two milestones to achieve the
>>>>> goals which were proposed by Jiangang:
>>>>> (1) Build a common control flow framework in Flink.
>>>>>      It focuses on control flow propagation. And, how to integrate the
>>>>> common control flow framework with existing mechanisms.
>>>>> (2) Builds a dynamic configuration framework which is exposed to users
>>>>> directly.
>>>>>      We could see dynamic configuration framework is a top application
>>>>> on the underlying control flow framework.
>>>>>      It focuses on the Public API which receives configuration
>>>>> updating requests from users. Besides, it is necessary to introduce an API
>>>>> protection mechanism to avoid job performance degradation caused by too
>>>>> many control events.
>>>>>
>>>>> I suggest splitting the whole design into two after we reach a
>>>>> consensus on whether to introduce this feature because these two sub-topic
>>>>> all need careful design.
>>>>>
>>>>>
>>>>> [
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>>>>> ]
>>>>>
>>>>> Best regards,
>>>>> JING ZHANG
>>>>>
>>>>> 刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
>>>>>
>>>>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>>>>> long-running, it is similar to many services. So interacting with it or
>>>>>> controlling it is a common desire. This was our initial thought when
>>>>>> implementing the feature. In our inner flink, many configs used in yaml can
>>>>>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>>>>>
>>>>>>    1. Limit the input qps.
>>>>>>    2. Degrade the job by sampling and so on.
>>>>>>    3. Reset kafka offset in certain cases.
>>>>>>    4. Stop checkpoint in certain cases.
>>>>>>    5. Control the history consuming.
>>>>>>    6. Change log level for debug.
>>>>>>
>>>>>>
>>>>>> After deep discussion, we realize that a common control flow
>>>>>> will benefit both users and developers. Dynamic config is just one of the
>>>>>> use cases. For the concrete design and implementation, it relates with many
>>>>>> components, like jobmaster, network channel, operators and so on, which
>>>>>> needs deeper consideration and design.
>>>>>>
>>>>>> Xintong Song [via Apache Flink User Mailing List archive.] <
>>>>>> [hidden email]> 于2021年6月7日周一 下午2:52写道:
>>>>>>
>>>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>>>>>>> feedback.
>>>>>>>
>>>>>>> I was part of the preliminary offline discussions before this
>>>>>>> proposal went public. So maybe I can help clarify things a bit.
>>>>>>>
>>>>>>> In short, despite the phrase "control mode" might be a bit
>>>>>>> misleading, what we truly want to do from my side is to make the concept of
>>>>>>> "control flow" explicit and expose it to users.
>>>>>>>
>>>>>>> ## Background
>>>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version
>>>>>>> of Flink. One of their custom features is allowing dynamically changing
>>>>>>> operator behaviors via the REST APIs. He's willing to contribute this
>>>>>>> feature to the community, and came to Yun Gao and me for suggestions. After
>>>>>>> discussion, we feel that the underlying question to be answered is how do
>>>>>>> we model the control flow in Flink. Dynamically controlling jobs via REST
>>>>>>> API can be one of the features built on top of the control flow, and there
>>>>>>> could be others.
>>>>>>>
>>>>>>> ## Control flow
>>>>>>> Control flow refers to the communication channels for sending
>>>>>>> events/signals to/between tasks/operators, that changes Flink's behavior in
>>>>>>> a way that may or may not affect the computation logic. Typical control
>>>>>>> events/signals Flink currently has are watermarks and checkpoint barriers.
>>>>>>>
>>>>>>> In general, for modeling control flow, the following questions
>>>>>>> should be considered.
>>>>>>> 1. Who (which component) is responsible for generating the control
>>>>>>> messages?
>>>>>>> 2. Who (which component) is responsible for reacting to the messages.
>>>>>>> 3. How do the messages propagate?
>>>>>>> 4. When it comes to affecting the computation logics, how should the
>>>>>>> control flow work together with the exact-once consistency.
>>>>>>>
>>>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably
>>>>>>> share many things in common. A unified control flow model would help
>>>>>>> deduplicate the common logics, allowing us to focus on the use case
>>>>>>> specific parts.
>>>>>>>
>>>>>>> E.g.,
>>>>>>> - Watermarks: generated by source operators, handled by window
>>>>>>> operators.
>>>>>>> - Checkpoint barrier: generated by the checkpoint coordinator,
>>>>>>> handled by all tasks
>>>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the
>>>>>>> REST command), handled by specific operators/UDFs
>>>>>>> - Operator defined events: The following features are still in
>>>>>>> planning, but may potentially benefit from the control flow model. (Please
>>>>>>> correct me if I'm wrong, @Yun, @Jark)
>>>>>>>   * Iteration: When a certain condition is met, we might want to
>>>>>>> signal downstream operators with an event
>>>>>>>   * Mini-batch assembling: Flink currently uses special watermarks
>>>>>>> for indicating the end of each mini-batch, which makes it tricky to deal
>>>>>>> with event time related computations.
>>>>>>>   * Hive dimension table join: For periodically reloaded hive
>>>>>>> tables, it would be helpful to have specific events signaling that a
>>>>>>> reloading is finished.
>>>>>>>   * Bootstrap dimension table join: This is similar to the previous
>>>>>>> one. In cases where we want to fully load the dimension table before
>>>>>>> starting joining the mainstream, it would be helpful to have an event
>>>>>>> signaling the finishing of the bootstrap.
>>>>>>>
>>>>>>> ## Dynamic REST controlling
>>>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>>>> think it's quite convenient. Currently, to dynamically change the behavior
>>>>>>> of an operator, we need to set up a separate source for the control events
>>>>>>> and leverage broadcast state. Being able to send the events via REST APIs
>>>>>>> definitely improves the usability.
>>>>>>>
>>>>>>> Leveraging dynamic configuration frameworks is for sure one possible
>>>>>>> approach. The reason we are in favor of introducing the control flow is
>>>>>>> that:
>>>>>>> - It benefits not only this specific dynamic controlling feature,
>>>>>>> but potentially other future features as well.
>>>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
>>>>>>> framework work together with Flink's consistency mechanism.
>>>>>>>
>>>>>>> Thank you~
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=0>> wrote:
>>>>>>>
>>>>>>>> Thank you for the reply. I have checked the post you mentioned. The
>>>>>>>> dynamic config may be useful sometimes. But it is hard to keep data
>>>>>>>> consistent in flink, for example, what if the dynamic config will take
>>>>>>>> effect when failover. Since dynamic config is a desire for users, maybe
>>>>>>>> flink can support it in some way.
>>>>>>>>
>>>>>>>> For the control mode, dynamic config is just one of the control
>>>>>>>> modes. In the google doc, I have list some other cases. For example,
>>>>>>>> control events are generated in operators or external services. Besides
>>>>>>>> user's dynamic config, flink system can support some common dynamic
>>>>>>>> configuration, like qps limit, checkpoint control and so on.
>>>>>>>>
>>>>>>>> It needs good design to handle the control mode structure. Based on
>>>>>>>> that, other control features can be added easily later, like changing log
>>>>>>>> level when job is running. In the end, flink will not just process data,
>>>>>>>> but also interact with users to receive control events like a service.
>>>>>>>>
>>>>>>>> Steven Wu <[hidden email]
>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五
>>>>>>>> 下午11:11写道:
>>>>>>>>
>>>>>>>>> I am not sure if we should solve this problem in Flink. This is
>>>>>>>>> more like a dynamic config problem that probably should be solved by some
>>>>>>>>> configuration framework. Here is one post from google search:
>>>>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>>>>>>>>>
>>>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]
>>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=2>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>       Flink jobs are always long-running. When the job is
>>>>>>>>>> running, users may want to control the job but not stop it. The control
>>>>>>>>>> reasons can be different as following:
>>>>>>>>>>
>>>>>>>>>>    1. Change data processing’ logic, such as filter condition.
>>>>>>>>>>    2. Send trigger events to make the progress forward.
>>>>>>>>>>    3. Define some tools to degrade the job, such as limit input
>>>>>>>>>>    qps, sampling data.
>>>>>>>>>>    4. Change log level to debug current problem.
>>>>>>>>>>
>>>>>>>>>>       The common way to do this is to stop the job, do
>>>>>>>>>> modifications and start the job. It may take a long time to recover. In
>>>>>>>>>> some situations, stopping jobs is intolerable, for example, the job is
>>>>>>>>>> related to money or important activities.So we need some
>>>>>>>>>> technologies to control the running job without stopping the job.
>>>>>>>>>>
>>>>>>>>>> We propose to add control mode for flink. A control mode based on
>>>>>>>>>> the restful interface is first introduced. It works by these steps:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    1. The user can predefine some logic which supports config
>>>>>>>>>>    control, such as filter condition.
>>>>>>>>>>    2. Run the job.
>>>>>>>>>>    3. If the user wants to change the job's running logic, just
>>>>>>>>>>    send a restful request with the responding config.
>>>>>>>>>>
>>>>>>>>>> Other control modes will also be considered in the future. More
>>>>>>>>>> introduction can refer to the doc
>>>>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>>>>>>>>>> . If the community likes the proposal, more discussion is needed and a more
>>>>>>>>>> detailed design will be given later. Any suggestions and ideas are welcome.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> If you reply to this email, your message will be added to the
>>>>>>> discussion below:
>>>>>>>
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>> email [hidden email]
>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>> here
>>>>>>> < >>>>>>> .
>>>>>>> NAML
>>>>>>> <
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>
>>>>>>
>>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Jary Zhen
 big +1 for this feature,
  1. Reset kafka offset in certain cases.
  2. Stop checkpoint in certain cases.
  3. Change log level for debug.

刘建刚 <[hidden email]> 于2021年6月11日周五 下午12:17写道:
    Thanks for all the discussions and suggestions. Since the topic has been discussed for about a week, it is time to have a conclusion and new ideas are welcomed at the same time.
    First, the topic starts with use cases in restful interface. The restful interface supported many useful interactions with users, for example as follows. It is an easy way to control the job compared with broadcast api.
  1. Change data processing’ logic by dynamic configs, such as filter condition.
  2. Define some tools to control the job, such as QPS limit, sampling, change log level and so on.
    Second, we broaden the topic to control flow in order to support all kinds of control events besides the above user cases. There is a strong demand to support custom (broadcast) events for iteration, SQL control events and so on. As Xintong Song said, the key to the control flow lies as follows:
  1. Who (which component) is responsible for generating the control messages? It may be the jobmaster by some ways, the inner operator and so on.
  2. Who (which component) is responsible for reacting to the messages. 
  3. How do the messages propagate? Flink should support sending control messages by channels.
  4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency.  To use the checkpoint mechanism, control messages flowing from source to down tasks may be a good idea.
    Third, a common and flexible control flow design requires good design and implementation as a base. Future features and existing features should both be considered. For future features, a common restful interface is first needed to support dynamic configs. For existing features, There exist checkpoint barriers, watermark and latency marker. They have some special behaviors but also share a lot in common. The common logic should be considered but maybe they should remain unchanged until the control flow is stable.
    Some other problems as follows:
  1. How to persist the control signals when the jobmaster fails? An idea is to persist control signals in HighAvailabilityServices and replay them after failover. The restful request should be non-blocking.
  2. Should all the operators receive the control messages? All operators should have the ability to receive upper operators' control messages but maybe not process them. If we want to persist the control message state, all the subtasks belonging to one operator should have the same control events in order to rescale easily.
    For the next step, I will draft a FLIP with the scope of common control flow framework. More discussions, ideas and problems are still welcome. 

Thank you~

Jiangang Liu








Xintong Song <[hidden email]> 于2021年6月9日周三 下午12:01写道:
>
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>

TBH, I don't really know yet. We feel that the control flow is a
non-trivial topic and it would be better to bring it up publicly as early
as possible, while the concrete plan is still on the way.

Personally, I'm leaning towards not touching the existing watermarks and
checkpoint barriers in the first step.
- I'd expect the control flow to be introduced as an experimental feature
that takes time to stabilize. It would be better that the existing
important features like checkpointing and watermarks stay unaffected.
- Checkpoint barriers are a little different, as other control messages
somehow rely on it to achieve exactly once consistency. Without the
concrete design, I'm not entirely sure whether it can be properly modeled
as a special case of general control messages.
- Watermarks are probably similar to the other control messages. However,
it's already exposed to users as public APIs. If we want to migrate it to
the new control flow, we'd be very careful not to break any compatibility.


Thank you~

Xintong Song



On Wed, Jun 9, 2021 at 11:30 AM Steven Wu <[hidden email]> wrote:

> > producing control events from JobMaster is similar to triggering a
> savepoint.
>
> Paul, here is what I see the difference. Upon job or jobmanager recovery,
> we don't need to recover and replay the savepoint trigger signal.
>
> On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <[hidden email]> wrote:
>
>> +1 for this feature. Setting up a separate control stream is too much for
>> many use cases, it would very helpful if users can leverage the built-in
>> control flow of Flink.
>>
>> My 2 cents:
>> 1. @Steven IMHO, producing control events from JobMaster is similar to
>> triggering a savepoint. The REST api is non-blocking, and users should poll
>> the results to confirm the operation is succeeded. If something goes wrong,
>> it’s user’s responsibility to retry.
>> 2. There are two kinds of existing special elements, special stream
>> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> flow through the whole DAG, but events needs to be acknowledged by
>> downstream and can overtake records, while stream records are not). So I’m
>> wondering if we plan to unify the two approaches in the new control flow
>> (as Xintong mentioned both in the previous mails)?
>>
>> Best,
>> Paul Lam
>>
>> 2021年6月8日 14:08,Steven Wu <[hidden email]> 写道:
>>
>>
>> I can see the benefits of control flow. E.g., it might help the old (and
>> inactive) FLIP-17 side input. I would suggest that we add more details of
>> some of the potential use cases.
>>
>> Here is one mismatch with using control flow for dynamic config. Dynamic
>> config is typically targeted/loaded by one specific operator. Control flow
>> will propagate the dynamic config to all operators. not a problem per se
>>
>> Regarding using the REST api (to jobmanager) for accepting control
>> signals from external system, where are we going to persist/checkpoint the
>> signal? jobmanager can die before the control signal is propagated and
>> checkpointed. Did we lose the control signal in this case?
>>
>>
>> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]>
>> wrote:
>>
>>> +1 on separating the effort into two steps:
>>>
>>>    1. Introduce a common control flow framework, with flexible
>>>    interfaces for generating / reacting to control messages for various
>>>    purposes.
>>>    2. Features that leverating the control flow can be worked on
>>>    concurrently
>>>
>>> Meantime, keeping collecting potential features that may leverage the
>>> control flow should be helpful. It provides good inputs for the control
>>> flow framework design, to make the framework common enough to cover the
>>> potential use cases.
>>>
>>> My suggestions on the next steps:
>>>
>>>    1. Allow more time for opinions to be heard and potential use cases
>>>    to be collected
>>>    2. Draft a FLIP with the scope of common control flow framework
>>>    3. We probably need a poc implementation to make sure the framework
>>>    covers at least the following scenarios
>>>       1. Produce control events from arbitrary operators
>>>       2. Produce control events from JobMaster
>>>       3. Consume control events from arbitrary operators downstream
>>>       where the events are produced
>>>
>>>
>>> Thank you~
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
>>>
>>>> Very thanks Jiangang for bringing this up and very thanks for the
>>>> discussion!
>>>>
>>>> I also agree with the summarization by Xintong and Jing that control
>>>> flow seems to be
>>>> a common buidling block for many functionalities and dynamic
>>>> configuration framework
>>>> is a representative application that frequently required by users.
>>>> Regarding the control flow,
>>>> currently we are also considering the design of iteration for the
>>>> flink-ml, and as Xintong has pointed
>>>> out, it also required the control flow in cases like detection global
>>>> termination inside the iteration
>>>>  (in this case we need to broadcast an event through the iteration
>>>> body to detect if there are still
>>>> records reside in the iteration body). And regarding  whether to
>>>> implement the dynamic configuration
>>>> framework, I also agree with Xintong that the consistency guarantee
>>>> would be a point to consider, we
>>>> might consider if we need to ensure every operator could receive the
>>>> dynamic configuration.
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>>
>>>>
>>>> ------------------------------------------------------------------
>>>> Sender:kai wang<[hidden email]>
>>>> Date:2021/06/08 11:52:12
>>>> Recipient:JING ZHANG<[hidden email]>
>>>> Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User
>>>> Mailing List archive.]<[hidden email]>; user<
>>>> [hidden email]>; dev<[hidden email]>
>>>> Theme:Re: Add control mode for flink
>>>>
>>>>
>>>>
>>>> I'm big +1 for this feature.
>>>>
>>>>    1. Limit the input qps.
>>>>    2. Change log level for debug.
>>>>
>>>> in my team, the two examples above are needed
>>>>
>>>> JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
>>>>
>>>>> Thanks Jiangang for bringing this up.
>>>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>>>> provides many useful functions in Kuaishou, because it could update job
>>>>> behavior without relaunching the job. The functions are very popular in
>>>>> Kuaishou, we also see similar demands in maillist [1].
>>>>>
>>>>> I'm big +1 for this feature.
>>>>>
>>>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>>>> idea about introducing control mode in Flink.
>>>>> It takes the original issue a big step closer to essence which also
>>>>> provides the possibility for more fantastic features as mentioned in
>>>>> Xintong and Jark's response.
>>>>> Based on the idea, there are at least two milestones to achieve the
>>>>> goals which were proposed by Jiangang:
>>>>> (1) Build a common control flow framework in Flink.
>>>>>      It focuses on control flow propagation. And, how to integrate the
>>>>> common control flow framework with existing mechanisms.
>>>>> (2) Builds a dynamic configuration framework which is exposed to users
>>>>> directly.
>>>>>      We could see dynamic configuration framework is a top application
>>>>> on the underlying control flow framework.
>>>>>      It focuses on the Public API which receives configuration
>>>>> updating requests from users. Besides, it is necessary to introduce an API
>>>>> protection mechanism to avoid job performance degradation caused by too
>>>>> many control events.
>>>>>
>>>>> I suggest splitting the whole design into two after we reach a
>>>>> consensus on whether to introduce this feature because these two sub-topic
>>>>> all need careful design.
>>>>>
>>>>>
>>>>> [
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>>>>> ]
>>>>>
>>>>> Best regards,
>>>>> JING ZHANG
>>>>>
>>>>> 刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
>>>>>
>>>>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>>>>> long-running, it is similar to many services. So interacting with it or
>>>>>> controlling it is a common desire. This was our initial thought when
>>>>>> implementing the feature. In our inner flink, many configs used in yaml can
>>>>>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>>>>>
>>>>>>    1. Limit the input qps.
>>>>>>    2. Degrade the job by sampling and so on.
>>>>>>    3. Reset kafka offset in certain cases.
>>>>>>    4. Stop checkpoint in certain cases.
>>>>>>    5. Control the history consuming.
>>>>>>    6. Change log level for debug.
>>>>>>
>>>>>>
>>>>>> After deep discussion, we realize that a common control flow
>>>>>> will benefit both users and developers. Dynamic config is just one of the
>>>>>> use cases. For the concrete design and implementation, it relates with many
>>>>>> components, like jobmaster, network channel, operators and so on, which
>>>>>> needs deeper consideration and design.
>>>>>>
>>>>>> Xintong Song [via Apache Flink User Mailing List archive.] <
>>>>>> [hidden email]> 于2021年6月7日周一 下午2:52写道:
>>>>>>
>>>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>>>>>>> feedback.
>>>>>>>
>>>>>>> I was part of the preliminary offline discussions before this
>>>>>>> proposal went public. So maybe I can help clarify things a bit.
>>>>>>>
>>>>>>> In short, despite the phrase "control mode" might be a bit
>>>>>>> misleading, what we truly want to do from my side is to make the concept of
>>>>>>> "control flow" explicit and expose it to users.
>>>>>>>
>>>>>>> ## Background
>>>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version
>>>>>>> of Flink. One of their custom features is allowing dynamically changing
>>>>>>> operator behaviors via the REST APIs. He's willing to contribute this
>>>>>>> feature to the community, and came to Yun Gao and me for suggestions. After
>>>>>>> discussion, we feel that the underlying question to be answered is how do
>>>>>>> we model the control flow in Flink. Dynamically controlling jobs via REST
>>>>>>> API can be one of the features built on top of the control flow, and there
>>>>>>> could be others.
>>>>>>>
>>>>>>> ## Control flow
>>>>>>> Control flow refers to the communication channels for sending
>>>>>>> events/signals to/between tasks/operators, that changes Flink's behavior in
>>>>>>> a way that may or may not affect the computation logic. Typical control
>>>>>>> events/signals Flink currently has are watermarks and checkpoint barriers.
>>>>>>>
>>>>>>> In general, for modeling control flow, the following questions
>>>>>>> should be considered.
>>>>>>> 1. Who (which component) is responsible for generating the control
>>>>>>> messages?
>>>>>>> 2. Who (which component) is responsible for reacting to the messages.
>>>>>>> 3. How do the messages propagate?
>>>>>>> 4. When it comes to affecting the computation logics, how should the
>>>>>>> control flow work together with the exact-once consistency.
>>>>>>>
>>>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably
>>>>>>> share many things in common. A unified control flow model would help
>>>>>>> deduplicate the common logics, allowing us to focus on the use case
>>>>>>> specific parts.
>>>>>>>
>>>>>>> E.g.,
>>>>>>> - Watermarks: generated by source operators, handled by window
>>>>>>> operators.
>>>>>>> - Checkpoint barrier: generated by the checkpoint coordinator,
>>>>>>> handled by all tasks
>>>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the
>>>>>>> REST command), handled by specific operators/UDFs
>>>>>>> - Operator defined events: The following features are still in
>>>>>>> planning, but may potentially benefit from the control flow model. (Please
>>>>>>> correct me if I'm wrong, @Yun, @Jark)
>>>>>>>   * Iteration: When a certain condition is met, we might want to
>>>>>>> signal downstream operators with an event
>>>>>>>   * Mini-batch assembling: Flink currently uses special watermarks
>>>>>>> for indicating the end of each mini-batch, which makes it tricky to deal
>>>>>>> with event time related computations.
>>>>>>>   * Hive dimension table join: For periodically reloaded hive
>>>>>>> tables, it would be helpful to have specific events signaling that a
>>>>>>> reloading is finished.
>>>>>>>   * Bootstrap dimension table join: This is similar to the previous
>>>>>>> one. In cases where we want to fully load the dimension table before
>>>>>>> starting joining the mainstream, it would be helpful to have an event
>>>>>>> signaling the finishing of the bootstrap.
>>>>>>>
>>>>>>> ## Dynamic REST controlling
>>>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>>>> think it's quite convenient. Currently, to dynamically change the behavior
>>>>>>> of an operator, we need to set up a separate source for the control events
>>>>>>> and leverage broadcast state. Being able to send the events via REST APIs
>>>>>>> definitely improves the usability.
>>>>>>>
>>>>>>> Leveraging dynamic configuration frameworks is for sure one possible
>>>>>>> approach. The reason we are in favor of introducing the control flow is
>>>>>>> that:
>>>>>>> - It benefits not only this specific dynamic controlling feature,
>>>>>>> but potentially other future features as well.
>>>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
>>>>>>> framework work together with Flink's consistency mechanism.
>>>>>>>
>>>>>>> Thank you~
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=0>> wrote:
>>>>>>>
>>>>>>>> Thank you for the reply. I have checked the post you mentioned. The
>>>>>>>> dynamic config may be useful sometimes. But it is hard to keep data
>>>>>>>> consistent in flink, for example, what if the dynamic config will take
>>>>>>>> effect when failover. Since dynamic config is a desire for users, maybe
>>>>>>>> flink can support it in some way.
>>>>>>>>
>>>>>>>> For the control mode, dynamic config is just one of the control
>>>>>>>> modes. In the google doc, I have list some other cases. For example,
>>>>>>>> control events are generated in operators or external services. Besides
>>>>>>>> user's dynamic config, flink system can support some common dynamic
>>>>>>>> configuration, like qps limit, checkpoint control and so on.
>>>>>>>>
>>>>>>>> It needs good design to handle the control mode structure. Based on
>>>>>>>> that, other control features can be added easily later, like changing log
>>>>>>>> level when job is running. In the end, flink will not just process data,
>>>>>>>> but also interact with users to receive control events like a service.
>>>>>>>>
>>>>>>>> Steven Wu <[hidden email]
>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五
>>>>>>>> 下午11:11写道:
>>>>>>>>
>>>>>>>>> I am not sure if we should solve this problem in Flink. This is
>>>>>>>>> more like a dynamic config problem that probably should be solved by some
>>>>>>>>> configuration framework. Here is one post from google search:
>>>>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>>>>>>>>>
>>>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]
>>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=2>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>       Flink jobs are always long-running. When the job is
>>>>>>>>>> running, users may want to control the job but not stop it. The control
>>>>>>>>>> reasons can be different as following:
>>>>>>>>>>
>>>>>>>>>>    1. Change data processing’ logic, such as filter condition.
>>>>>>>>>>    2. Send trigger events to make the progress forward.
>>>>>>>>>>    3. Define some tools to degrade the job, such as limit input
>>>>>>>>>>    qps, sampling data.
>>>>>>>>>>    4. Change log level to debug current problem.
>>>>>>>>>>
>>>>>>>>>>       The common way to do this is to stop the job, do
>>>>>>>>>> modifications and start the job. It may take a long time to recover. In
>>>>>>>>>> some situations, stopping jobs is intolerable, for example, the job is
>>>>>>>>>> related to money or important activities.So we need some
>>>>>>>>>> technologies to control the running job without stopping the job.
>>>>>>>>>>
>>>>>>>>>> We propose to add control mode for flink. A control mode based on
>>>>>>>>>> the restful interface is first introduced. It works by these steps:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    1. The user can predefine some logic which supports config
>>>>>>>>>>    control, such as filter condition.
>>>>>>>>>>    2. Run the job.
>>>>>>>>>>    3. If the user wants to change the job's running logic, just
>>>>>>>>>>    send a restful request with the responding config.
>>>>>>>>>>
>>>>>>>>>> Other control modes will also be considered in the future. More
>>>>>>>>>> introduction can refer to the doc
>>>>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>>>>>>>>>> . If the community likes the proposal, more discussion is needed and a more
>>>>>>>>>> detailed design will be given later. Any suggestions and ideas are welcome.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> If you reply to this email, your message will be added to the
>>>>>>> discussion below:
>>>>>>>
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>> email [hidden email]
>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>> here
>>>>>>> < >>>>>>> .
>>>>>>> NAML
>>>>>>> <
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>
>>>>>>
>>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Add control mode for flink

Till Rohrmann
Thanks for starting this discussion. I do see the benefit of dynamically configuring your Flink job and the cluster running it. Some of the use cases which were mentioned here are already possible. E.g. adjusting the log level dynamically can be done by configuring an appropriate logging backend and then changing the logging properties (log4j 2 supports this for example). Then the remaining use cases can be categorized into two categories: 

1) changing the job
2) changing the cluster configuration 

1) would benefit from general control flow events which will be processed by all operators. 2) would require some component sending some control events to the other Flink processes.

Implementing the control flow events can already be done to a good extent on the user level by using a connected stream and a user level-record type which can distinguish between control events and normal records. Admittedly, it is a bit of work, though.

I think persisting all of these changes would be very important because otherwise, you might end up easily in an inconsistent state. For example, assume you have changed the log level and now a subset of the TaskManagers needs to be restarted. Now, all of a sudden some TaskManagers log on level X and the others on level Y. The same applies to job changes. A regional failover would have to restore the latest dynamically configured state. All in all, this looks like a very complex and complicated task.

On the other hand, most of the described use cases should be realizable with a restart of a job. So if Flink were able to quickly resume a job, then we would probably not need this feature. Applying the changes to the Flink and the job configuration and resubmitting the job would do the trick. Hence, improving Flink's recovery speed could be an alternative approach to this problem.

Cheers,
Till

On Fri, Jun 11, 2021 at 9:51 AM Jary Zhen <[hidden email]> wrote:
 big +1 for this feature,
  1. Reset kafka offset in certain cases.
  2. Stop checkpoint in certain cases.
  3. Change log level for debug.

刘建刚 <[hidden email]> 于2021年6月11日周五 下午12:17写道:
    Thanks for all the discussions and suggestions. Since the topic has been discussed for about a week, it is time to have a conclusion and new ideas are welcomed at the same time.
    First, the topic starts with use cases in restful interface. The restful interface supported many useful interactions with users, for example as follows. It is an easy way to control the job compared with broadcast api.
  1. Change data processing’ logic by dynamic configs, such as filter condition.
  2. Define some tools to control the job, such as QPS limit, sampling, change log level and so on.
    Second, we broaden the topic to control flow in order to support all kinds of control events besides the above user cases. There is a strong demand to support custom (broadcast) events for iteration, SQL control events and so on. As Xintong Song said, the key to the control flow lies as follows:
  1. Who (which component) is responsible for generating the control messages? It may be the jobmaster by some ways, the inner operator and so on.
  2. Who (which component) is responsible for reacting to the messages. 
  3. How do the messages propagate? Flink should support sending control messages by channels.
  4. When it comes to affecting the computation logics, how should the control flow work together with the exact-once consistency.  To use the checkpoint mechanism, control messages flowing from source to down tasks may be a good idea.
    Third, a common and flexible control flow design requires good design and implementation as a base. Future features and existing features should both be considered. For future features, a common restful interface is first needed to support dynamic configs. For existing features, There exist checkpoint barriers, watermark and latency marker. They have some special behaviors but also share a lot in common. The common logic should be considered but maybe they should remain unchanged until the control flow is stable.
    Some other problems as follows:
  1. How to persist the control signals when the jobmaster fails? An idea is to persist control signals in HighAvailabilityServices and replay them after failover. The restful request should be non-blocking.
  2. Should all the operators receive the control messages? All operators should have the ability to receive upper operators' control messages but maybe not process them. If we want to persist the control message state, all the subtasks belonging to one operator should have the same control events in order to rescale easily.
    For the next step, I will draft a FLIP with the scope of common control flow framework. More discussions, ideas and problems are still welcome. 

Thank you~

Jiangang Liu








Xintong Song <[hidden email]> 于2021年6月9日周三 下午12:01写道:
>
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>

TBH, I don't really know yet. We feel that the control flow is a
non-trivial topic and it would be better to bring it up publicly as early
as possible, while the concrete plan is still on the way.

Personally, I'm leaning towards not touching the existing watermarks and
checkpoint barriers in the first step.
- I'd expect the control flow to be introduced as an experimental feature
that takes time to stabilize. It would be better that the existing
important features like checkpointing and watermarks stay unaffected.
- Checkpoint barriers are a little different, as other control messages
somehow rely on it to achieve exactly once consistency. Without the
concrete design, I'm not entirely sure whether it can be properly modeled
as a special case of general control messages.
- Watermarks are probably similar to the other control messages. However,
it's already exposed to users as public APIs. If we want to migrate it to
the new control flow, we'd be very careful not to break any compatibility.


Thank you~

Xintong Song



On Wed, Jun 9, 2021 at 11:30 AM Steven Wu <[hidden email]> wrote:

> > producing control events from JobMaster is similar to triggering a
> savepoint.
>
> Paul, here is what I see the difference. Upon job or jobmanager recovery,
> we don't need to recover and replay the savepoint trigger signal.
>
> On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <[hidden email]> wrote:
>
>> +1 for this feature. Setting up a separate control stream is too much for
>> many use cases, it would very helpful if users can leverage the built-in
>> control flow of Flink.
>>
>> My 2 cents:
>> 1. @Steven IMHO, producing control events from JobMaster is similar to
>> triggering a savepoint. The REST api is non-blocking, and users should poll
>> the results to confirm the operation is succeeded. If something goes wrong,
>> it’s user’s responsibility to retry.
>> 2. There are two kinds of existing special elements, special stream
>> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> flow through the whole DAG, but events needs to be acknowledged by
>> downstream and can overtake records, while stream records are not). So I’m
>> wondering if we plan to unify the two approaches in the new control flow
>> (as Xintong mentioned both in the previous mails)?
>>
>> Best,
>> Paul Lam
>>
>> 2021年6月8日 14:08,Steven Wu <[hidden email]> 写道:
>>
>>
>> I can see the benefits of control flow. E.g., it might help the old (and
>> inactive) FLIP-17 side input. I would suggest that we add more details of
>> some of the potential use cases.
>>
>> Here is one mismatch with using control flow for dynamic config. Dynamic
>> config is typically targeted/loaded by one specific operator. Control flow
>> will propagate the dynamic config to all operators. not a problem per se
>>
>> Regarding using the REST api (to jobmanager) for accepting control
>> signals from external system, where are we going to persist/checkpoint the
>> signal? jobmanager can die before the control signal is propagated and
>> checkpointed. Did we lose the control signal in this case?
>>
>>
>> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]>
>> wrote:
>>
>>> +1 on separating the effort into two steps:
>>>
>>>    1. Introduce a common control flow framework, with flexible
>>>    interfaces for generating / reacting to control messages for various
>>>    purposes.
>>>    2. Features that leverating the control flow can be worked on
>>>    concurrently
>>>
>>> Meantime, keeping collecting potential features that may leverage the
>>> control flow should be helpful. It provides good inputs for the control
>>> flow framework design, to make the framework common enough to cover the
>>> potential use cases.
>>>
>>> My suggestions on the next steps:
>>>
>>>    1. Allow more time for opinions to be heard and potential use cases
>>>    to be collected
>>>    2. Draft a FLIP with the scope of common control flow framework
>>>    3. We probably need a poc implementation to make sure the framework
>>>    covers at least the following scenarios
>>>       1. Produce control events from arbitrary operators
>>>       2. Produce control events from JobMaster
>>>       3. Consume control events from arbitrary operators downstream
>>>       where the events are produced
>>>
>>>
>>> Thank you~
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]> wrote:
>>>
>>>> Very thanks Jiangang for bringing this up and very thanks for the
>>>> discussion!
>>>>
>>>> I also agree with the summarization by Xintong and Jing that control
>>>> flow seems to be
>>>> a common buidling block for many functionalities and dynamic
>>>> configuration framework
>>>> is a representative application that frequently required by users.
>>>> Regarding the control flow,
>>>> currently we are also considering the design of iteration for the
>>>> flink-ml, and as Xintong has pointed
>>>> out, it also required the control flow in cases like detection global
>>>> termination inside the iteration
>>>>  (in this case we need to broadcast an event through the iteration
>>>> body to detect if there are still
>>>> records reside in the iteration body). And regarding  whether to
>>>> implement the dynamic configuration
>>>> framework, I also agree with Xintong that the consistency guarantee
>>>> would be a point to consider, we
>>>> might consider if we need to ensure every operator could receive the
>>>> dynamic configuration.
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>>
>>>>
>>>> ------------------------------------------------------------------
>>>> Sender:kai wang<[hidden email]>
>>>> Date:2021/06/08 11:52:12
>>>> Recipient:JING ZHANG<[hidden email]>
>>>> Cc:刘建刚<[hidden email]>; Xintong Song [via Apache Flink User
>>>> Mailing List archive.]<[hidden email]>; user<
>>>> [hidden email]>; dev<[hidden email]>
>>>> Theme:Re: Add control mode for flink
>>>>
>>>>
>>>>
>>>> I'm big +1 for this feature.
>>>>
>>>>    1. Limit the input qps.
>>>>    2. Change log level for debug.
>>>>
>>>> in my team, the two examples above are needed
>>>>
>>>> JING ZHANG <[hidden email]> 于2021年6月8日周二 上午11:18写道:
>>>>
>>>>> Thanks Jiangang for bringing this up.
>>>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>>>> provides many useful functions in Kuaishou, because it could update job
>>>>> behavior without relaunching the job. The functions are very popular in
>>>>> Kuaishou, we also see similar demands in maillist [1].
>>>>>
>>>>> I'm big +1 for this feature.
>>>>>
>>>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>>>> idea about introducing control mode in Flink.
>>>>> It takes the original issue a big step closer to essence which also
>>>>> provides the possibility for more fantastic features as mentioned in
>>>>> Xintong and Jark's response.
>>>>> Based on the idea, there are at least two milestones to achieve the
>>>>> goals which were proposed by Jiangang:
>>>>> (1) Build a common control flow framework in Flink.
>>>>>      It focuses on control flow propagation. And, how to integrate the
>>>>> common control flow framework with existing mechanisms.
>>>>> (2) Builds a dynamic configuration framework which is exposed to users
>>>>> directly.
>>>>>      We could see dynamic configuration framework is a top application
>>>>> on the underlying control flow framework.
>>>>>      It focuses on the Public API which receives configuration
>>>>> updating requests from users. Besides, it is necessary to introduce an API
>>>>> protection mechanism to avoid job performance degradation caused by too
>>>>> many control events.
>>>>>
>>>>> I suggest splitting the whole design into two after we reach a
>>>>> consensus on whether to introduce this feature because these two sub-topic
>>>>> all need careful design.
>>>>>
>>>>>
>>>>> [
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>>>>> ]
>>>>>
>>>>> Best regards,
>>>>> JING ZHANG
>>>>>
>>>>> 刘建刚 <[hidden email]> 于2021年6月8日周二 上午10:01写道:
>>>>>
>>>>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>>>>> long-running, it is similar to many services. So interacting with it or
>>>>>> controlling it is a common desire. This was our initial thought when
>>>>>> implementing the feature. In our inner flink, many configs used in yaml can
>>>>>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>>>>>
>>>>>>    1. Limit the input qps.
>>>>>>    2. Degrade the job by sampling and so on.
>>>>>>    3. Reset kafka offset in certain cases.
>>>>>>    4. Stop checkpoint in certain cases.
>>>>>>    5. Control the history consuming.
>>>>>>    6. Change log level for debug.
>>>>>>
>>>>>>
>>>>>> After deep discussion, we realize that a common control flow
>>>>>> will benefit both users and developers. Dynamic config is just one of the
>>>>>> use cases. For the concrete design and implementation, it relates with many
>>>>>> components, like jobmaster, network channel, operators and so on, which
>>>>>> needs deeper consideration and design.
>>>>>>
>>>>>> Xintong Song [via Apache Flink User Mailing List archive.] <
>>>>>> [hidden email]> 于2021年6月7日周一 下午2:52写道:
>>>>>>
>>>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>>>>>>> feedback.
>>>>>>>
>>>>>>> I was part of the preliminary offline discussions before this
>>>>>>> proposal went public. So maybe I can help clarify things a bit.
>>>>>>>
>>>>>>> In short, despite the phrase "control mode" might be a bit
>>>>>>> misleading, what we truly want to do from my side is to make the concept of
>>>>>>> "control flow" explicit and expose it to users.
>>>>>>>
>>>>>>> ## Background
>>>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version
>>>>>>> of Flink. One of their custom features is allowing dynamically changing
>>>>>>> operator behaviors via the REST APIs. He's willing to contribute this
>>>>>>> feature to the community, and came to Yun Gao and me for suggestions. After
>>>>>>> discussion, we feel that the underlying question to be answered is how do
>>>>>>> we model the control flow in Flink. Dynamically controlling jobs via REST
>>>>>>> API can be one of the features built on top of the control flow, and there
>>>>>>> could be others.
>>>>>>>
>>>>>>> ## Control flow
>>>>>>> Control flow refers to the communication channels for sending
>>>>>>> events/signals to/between tasks/operators, that changes Flink's behavior in
>>>>>>> a way that may or may not affect the computation logic. Typical control
>>>>>>> events/signals Flink currently has are watermarks and checkpoint barriers.
>>>>>>>
>>>>>>> In general, for modeling control flow, the following questions
>>>>>>> should be considered.
>>>>>>> 1. Who (which component) is responsible for generating the control
>>>>>>> messages?
>>>>>>> 2. Who (which component) is responsible for reacting to the messages.
>>>>>>> 3. How do the messages propagate?
>>>>>>> 4. When it comes to affecting the computation logics, how should the
>>>>>>> control flow work together with the exact-once consistency.
>>>>>>>
>>>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably
>>>>>>> share many things in common. A unified control flow model would help
>>>>>>> deduplicate the common logics, allowing us to focus on the use case
>>>>>>> specific parts.
>>>>>>>
>>>>>>> E.g.,
>>>>>>> - Watermarks: generated by source operators, handled by window
>>>>>>> operators.
>>>>>>> - Checkpoint barrier: generated by the checkpoint coordinator,
>>>>>>> handled by all tasks
>>>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the
>>>>>>> REST command), handled by specific operators/UDFs
>>>>>>> - Operator defined events: The following features are still in
>>>>>>> planning, but may potentially benefit from the control flow model. (Please
>>>>>>> correct me if I'm wrong, @Yun, @Jark)
>>>>>>>   * Iteration: When a certain condition is met, we might want to
>>>>>>> signal downstream operators with an event
>>>>>>>   * Mini-batch assembling: Flink currently uses special watermarks
>>>>>>> for indicating the end of each mini-batch, which makes it tricky to deal
>>>>>>> with event time related computations.
>>>>>>>   * Hive dimension table join: For periodically reloaded hive
>>>>>>> tables, it would be helpful to have specific events signaling that a
>>>>>>> reloading is finished.
>>>>>>>   * Bootstrap dimension table join: This is similar to the previous
>>>>>>> one. In cases where we want to fully load the dimension table before
>>>>>>> starting joining the mainstream, it would be helpful to have an event
>>>>>>> signaling the finishing of the bootstrap.
>>>>>>>
>>>>>>> ## Dynamic REST controlling
>>>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>>>> think it's quite convenient. Currently, to dynamically change the behavior
>>>>>>> of an operator, we need to set up a separate source for the control events
>>>>>>> and leverage broadcast state. Being able to send the events via REST APIs
>>>>>>> definitely improves the usability.
>>>>>>>
>>>>>>> Leveraging dynamic configuration frameworks is for sure one possible
>>>>>>> approach. The reason we are in favor of introducing the control flow is
>>>>>>> that:
>>>>>>> - It benefits not only this specific dynamic controlling feature,
>>>>>>> but potentially other future features as well.
>>>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
>>>>>>> framework work together with Flink's consistency mechanism.
>>>>>>>
>>>>>>> Thank you~
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=0>> wrote:
>>>>>>>
>>>>>>>> Thank you for the reply. I have checked the post you mentioned. The
>>>>>>>> dynamic config may be useful sometimes. But it is hard to keep data
>>>>>>>> consistent in flink, for example, what if the dynamic config will take
>>>>>>>> effect when failover. Since dynamic config is a desire for users, maybe
>>>>>>>> flink can support it in some way.
>>>>>>>>
>>>>>>>> For the control mode, dynamic config is just one of the control
>>>>>>>> modes. In the google doc, I have list some other cases. For example,
>>>>>>>> control events are generated in operators or external services. Besides
>>>>>>>> user's dynamic config, flink system can support some common dynamic
>>>>>>>> configuration, like qps limit, checkpoint control and so on.
>>>>>>>>
>>>>>>>> It needs good design to handle the control mode structure. Based on
>>>>>>>> that, other control features can be added easily later, like changing log
>>>>>>>> level when job is running. In the end, flink will not just process data,
>>>>>>>> but also interact with users to receive control events like a service.
>>>>>>>>
>>>>>>>> Steven Wu <[hidden email]
>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五
>>>>>>>> 下午11:11写道:
>>>>>>>>
>>>>>>>>> I am not sure if we should solve this problem in Flink. This is
>>>>>>>>> more like a dynamic config problem that probably should be solved by some
>>>>>>>>> configuration framework. Here is one post from google search:
>>>>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>>>>>>>>>
>>>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]
>>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=2>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>       Flink jobs are always long-running. When the job is
>>>>>>>>>> running, users may want to control the job but not stop it. The control
>>>>>>>>>> reasons can be different as following:
>>>>>>>>>>
>>>>>>>>>>    1. Change data processing’ logic, such as filter condition.
>>>>>>>>>>    2. Send trigger events to make the progress forward.
>>>>>>>>>>    3. Define some tools to degrade the job, such as limit input
>>>>>>>>>>    qps, sampling data.
>>>>>>>>>>    4. Change log level to debug current problem.
>>>>>>>>>>
>>>>>>>>>>       The common way to do this is to stop the job, do
>>>>>>>>>> modifications and start the job. It may take a long time to recover. In
>>>>>>>>>> some situations, stopping jobs is intolerable, for example, the job is
>>>>>>>>>> related to money or important activities.So we need some
>>>>>>>>>> technologies to control the running job without stopping the job.
>>>>>>>>>>
>>>>>>>>>> We propose to add control mode for flink. A control mode based on
>>>>>>>>>> the restful interface is first introduced. It works by these steps:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    1. The user can predefine some logic which supports config
>>>>>>>>>>    control, such as filter condition.
>>>>>>>>>>    2. Run the job.
>>>>>>>>>>    3. If the user wants to change the job's running logic, just
>>>>>>>>>>    send a restful request with the responding config.
>>>>>>>>>>
>>>>>>>>>> Other control modes will also be considered in the future. More
>>>>>>>>>> introduction can refer to the doc
>>>>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>>>>>>>>>> . If the community likes the proposal, more discussion is needed and a more
>>>>>>>>>> detailed design will be given later. Any suggestions and ideas are welcome.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> If you reply to this email, your message will be added to the
>>>>>>> discussion below:
>>>>>>>
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>> email [hidden email]
>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>> here
>>>>>>> < >>>>>>> .
>>>>>>> NAML
>>>>>>> <
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>
>>>>>>
>>>>
>>
12