How to broadcast messages to all task manager instances in cluster?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to broadcast messages to all task manager instances in cluster?

Di Tang
Hi guys:

I have a Flink job which contains multiple pipelines. Each pipeline depends on some configuration. I want to make the configuration dynamic and effective after change so I created a data source which periodically poll the database storing the configuration. However, how can I broadcast the events to all task manager instances?  The datastream.broadcast() only applies to the parallel instances of operator. And I don't want to connect the configuration data source to each pipeline because it is too verbose. If Flink cannot explicitly broadcast messages to task managers, is there any method to guarantee the parallel operator is distributed on all task managers?

Thanks,
Di
Reply | Threaded
Open this post in threaded view
|

Re: How to broadcast messages to all task manager instances in cluster?

Piotr Nowojski
Hi,

I don’t quite understand your problem. If you broadcast message as an input to your operator that depends on this configuration, each instance of your operator will receive this configuration. It shouldn't matter whether Flink scheduled your operator on one, some or all of the TaskManagers. It only should matter if operators running your configuration sensitive code receive the broadcasted message.


DataStream<> input = xxx;
DataStream<> controlConfigInput = yyy;

DataStream<> data = input.
.do()
.something()
.fancy();

controlConfigInput.broadcast()
.connect(data)
.flatMap(new MyFancyOperatorThatDependsOnConfigStream())


Piotrek

On 11 May 2018, at 11:11, Di Tang <[hidden email]> wrote:

Hi guys:

I have a Flink job which contains multiple pipelines. Each pipeline depends on some configuration. I want to make the configuration dynamic and effective after change so I created a data source which periodically poll the database storing the configuration. However, how can I broadcast the events to all task manager instances?  The datastream.broadcast() only applies to the parallel instances of operator. And I don't want to connect the configuration data source to each pipeline because it is too verbose. If Flink cannot explicitly broadcast messages to task managers, is there any method to guarantee the parallel operator is distributed on all task managers?

Thanks,
Di

Reply | Threaded
Open this post in threaded view
|

Re: How to broadcast messages to all task manager instances in cluster?

Di Tang
Thanks Piotr for the response. I have many data streams dependant on the configuration by getting value from static variables in a class. The way the configuration change works is to change the static variables' value in the class. Since each task manager only has one JVM process, as long as the message is broadcast to each task manager, the data streams will see the change. The logic in data streams is quite simple, just get some parameters from the static variable. So I think to add connect and flatmap to each of them is too verbose. I am wondering is there any better way to express.

Piotr Nowojski <[hidden email]> 于 2018年5月11日周五 下午7:31写道:
Hi,

I don’t quite understand your problem. If you broadcast message as an input to your operator that depends on this configuration, each instance of your operator will receive this configuration. It shouldn't matter whether Flink scheduled your operator on one, some or all of the TaskManagers. It only should matter if operators running your configuration sensitive code receive the broadcasted message.


DataStream<> input = xxx;
DataStream<> controlConfigInput = yyy;

DataStream<> data = input.
.do()
.something()
.fancy();

controlConfigInput.broadcast()
.connect(data)
.flatMap(new MyFancyOperatorThatDependsOnConfigStream())


Piotrek

On 11 May 2018, at 11:11, Di Tang <[hidden email]> wrote:

Hi guys:

I have a Flink job which contains multiple pipelines. Each pipeline depends on some configuration. I want to make the configuration dynamic and effective after change so I created a data source which periodically poll the database storing the configuration. However, how can I broadcast the events to all task manager instances?  The datastream.broadcast() only applies to the parallel instances of operator. And I don't want to connect the configuration data source to each pipeline because it is too verbose. If Flink cannot explicitly broadcast messages to task managers, is there any method to guarantee the parallel operator is distributed on all task managers?

Thanks,
Di

Reply | Threaded
Open this post in threaded view
|

Re: How to broadcast messages to all task manager instances in cluster?

Piotr Nowojski
Hi,

Thanks for the clarification. This might be though. Generally speaking having such static configuration shared across multiple operators/functions can pose lots of different problems including synchronisation, fault tolerance etc. 

To be honest you should treat such thing almost like an external system that has an external state, because from Flink’s perspective that’s exactly what it is - it’s an equivalent to having an external “configuration service” hosted/stored somewhere outside of Flink. With it you have to manually take care of fault tolerance (especially it’s state), since it’s outside of Flink’s control. Especially think about what should happen to your static configuration if one of your machine fails/restarts, and Flink chooses to restart only part of the job graph (possible one, many or all of the operators). How will your static configuration be kept in sync across all of the Task Managers in that case?

It would be easier if you could restructure your job/problem and replace such static configuration with a configuration stored in the Flink’s state (maybe in one operator? Or on parallel instances of one task?). Otherwise to make it fully reliable I think you would need to write quite a lot of code on your own. 

Alternatively you can consider using some third party systems for storing a configuration like Apache ZooKeeper.

Piotrek

On 13 May 2018, at 10:38, Di Tang <[hidden email]> wrote:

Thanks Piotr for the response. I have many data streams dependant on the configuration by getting value from static variables in a class. The way the configuration change works is to change the static variables' value in the class. Since each task manager only has one JVM process, as long as the message is broadcast to each task manager, the data streams will see the change. The logic in data streams is quite simple, just get some parameters from the static variable. So I think to add connect and flatmap to each of them is too verbose. I am wondering is there any better way to express.

Piotr Nowojski <[hidden email]> 于 2018年5月11日周五 下午7:31写道:
Hi,

I don’t quite understand your problem. If you broadcast message as an input to your operator that depends on this configuration, each instance of your operator will receive this configuration. It shouldn't matter whether Flink scheduled your operator on one, some or all of the TaskManagers. It only should matter if operators running your configuration sensitive code receive the broadcasted message.


DataStream<> input = xxx;
DataStream<> controlConfigInput = yyy;

DataStream<> data = input.
.do()
.something()
.fancy();

controlConfigInput.broadcast()
.connect(data)
.flatMap(new MyFancyOperatorThatDependsOnConfigStream())


Piotrek

On 11 May 2018, at 11:11, Di Tang <[hidden email]> wrote:

Hi guys:

I have a Flink job which contains multiple pipelines. Each pipeline depends on some configuration. I want to make the configuration dynamic and effective after change so I created a data source which periodically poll the database storing the configuration. However, how can I broadcast the events to all task manager instances?  The datastream.broadcast() only applies to the parallel instances of operator. And I don't want to connect the configuration data source to each pipeline because it is too verbose. If Flink cannot explicitly broadcast messages to task managers, is there any method to guarantee the parallel operator is distributed on all task managers?

Thanks,
Di