Communicating with my operators

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Communicating with my operators

Tom Wells
Hi Everyone

I'm looking for some advice on designing my operators (which unsurprisingly tend to take the form of SourceFunctions, ProcessFunctions or SinkFunctions) to allow them to be "dynamically configured" while running. 

By way of example, I have a SourceFunction which collects the names of various S3 buckets, and then a ProcessFunction which reads and collects their contents. The gotcha is that the list of S3 buckets is not fixed, and can be changed during the lifetime of the job. This add/remove action would be done by some human administrator, and lets say using a simple command line tool.

For example - here is an idea of what I want to build to "communicate" with my flink job: 

```
# Add a bucket to the flink job to process
$ ./admin-tool add-bucket --name my-s3-bucket --region eu-west-1 --access-key <blah>...

# Get a list of the s3 buckets we're currently processing, and when last they were last accessed
$ ./admin-tool list-buckets
my-s3-bucket | eu-west-1 | 5 seconds ago

# Remove buckets
$ ./admin-tool remove-bucket --name my-s3-bucket
```

Hope that gives you an idea - of course this could apply to any number of different source types, and could even extend to configuration of sinks etc too. 

So - how should my command line tool communicate with my operators?

4 alternative approaches I've thought about:

- Have a SourceFunction open a websocket and listen for bucket add/remove commands (written to by the command line tool). I think this would work, but the difficulty is in figuring out where exactly the SourceFunction might be deployed in the flink cluster to find the websocket listening port. I took a look at the ClusterClient API and it's possibly available by inspecting the JobGraph... I'm just not sure if this is an anti-pattern?

- Use a CoProcessFunction instead, and have it be joined with a DataStream that I can somehow write to directly from the command line tool (maybe using flink-client api - can i write to a DataStream directly??). I don't think this is possible but would feel like a good clean approach?

- Somehow using the ParameterTool. I don't think it supports a dynamic use-case though? 

- Writing directly to the saved state of a ProcessFunction to add the remove bucket names. I'm pretty unfamiliar with this approach - but looks possible according to the docs on the State Processor API - however it seems like I would have to read the savepoint, write the updates, then restore from savepoint which may mean suspending and resuming the job while that happens. Not really an issue for me, but does feel like possibly the wrong approach for my simple requirement.

- Solve it just using datasources - e.g. create a centrally read s3 bucket which holds the latest configuration and is sourced and joined by every operator (probably using Broadcast State). My command line tool would then just have to write to that S3 bucket - no need to communicate directly with the operators. 

The last option is fairly obvious and probably my default approach - I'm just wondering if whether any of the alternatives above are worth investigating. (Especially considering my endless quest to learn everything about Flink - i don't mind exploring the less obvious pathways).

I would love some guidance or advice on what you feel is the best approach / idiomatic approach for this.

All the best,
Tom
Reply | Threaded
Open this post in threaded view
|

Re: Communicating with my operators

Chesnay Schepler
Using an S3 bucket containing the configuration is the way to go.

1) web sockets, or more generally all approaches where you connect to
the source

The JobGraph won't help you; it doesn't contain the information on where
tasks are deployed to at runtime. It is just an abstract representation
of your job.

You could theoretically retrieve the actual location through the REST
API, and maybe expose the port as a metric.

But then you still have to deal with resolving IPs, internal/external
IPs and all that jazz.

2) CoProcessFunction

We still have to get the data in somehow; so you'd need to have some
source in any case :)

3) ParameterTool

This is really just a parsing tool, so it won't help for this use-case.

4) State Processing API

A bit too complicated. If restarting jobs is an option, you could just
encode the commands into the source, emit them as an event of sort, and
the process function updates it's state on reception of these events.

On 15/07/2020 10:00, Tom Wells wrote:

> Hi Everyone
>
> I'm looking for some advice on designing my operators (which
> unsurprisingly tend to take the form of SourceFunctions,
> ProcessFunctions or SinkFunctions) to allow them to be "dynamically
> configured" while running.
>
> By way of example, I have a SourceFunction which collects the names of
> various S3 buckets, and then a ProcessFunction which reads and
> collects their contents. The gotcha is that the list of S3 buckets is
> not fixed, and can be changed during the lifetime of the job. This
> add/remove action would be done by some human administrator, and lets
> say using a simple command line tool.
>
> For example - here is an idea of what I want to build to "communicate"
> with my flink job:
>
> ```
> # Add a bucket to the flink job to process
> $ ./admin-tool add-bucket --name my-s3-bucket --region eu-west-1
> --access-key <blah>...
>
> # Get a list of the s3 buckets we're currently processing, and when
> last they were last accessed
> $ ./admin-tool list-buckets
> my-s3-bucket | eu-west-1 | 5 seconds ago
>
> # Remove buckets
> $ ./admin-tool remove-bucket --name my-s3-bucket
> ```
>
> Hope that gives you an idea - of course this could apply to any number
> of different source types, and could even extend to configuration of
> sinks etc too.
>
> So - how should my command line tool communicate with my operators?
>
> 4 alternative approaches I've thought about:
>
> - Have a SourceFunction open a websocket and listen for bucket
> add/remove commands (written to by the command line tool). I think
> this would work, but the difficulty is in figuring out where exactly
> the SourceFunction might be deployed in the flink cluster to find the
> websocket listening port. I took a look at the ClusterClient API and
> it's possibly available by inspecting the JobGraph... I'm just not
> sure if this is an anti-pattern?
>
> - Use a CoProcessFunction instead, and have it be joined with a
> DataStream that I can somehow write to directly from the command line
> tool (maybe using flink-client api - can i write to a DataStream
> directly??). I don't think this is possible but would feel like a good
> clean approach?
>
> - Somehow using the ParameterTool. I don't think it supports a dynamic
> use-case though?
>
> - Writing directly to the saved state of a ProcessFunction to add the
> remove bucket names. I'm pretty unfamiliar with this approach - but
> looks possible according to the docs on the State Processor API -
> however it seems like I would have to read the savepoint, write the
> updates, then restore from savepoint which may mean suspending and
> resuming the job while that happens. Not really an issue for me, but
> does feel like possibly the wrong approach for my simple requirement.
>
> - Solve it just using datasources - e.g. create a centrally read s3
> bucket which holds the latest configuration and is sourced and joined
> by every operator (probably using Broadcast State). My command line
> tool would then just have to write to that S3 bucket - no need to
> communicate directly with the operators.
>
> The last option is fairly obvious and probably my default approach -
> I'm just wondering if whether any of the alternatives above are worth
> investigating. (Especially considering my endless quest to learn
> everything about Flink - i don't mind exploring the less obvious
> pathways).
>
> I would love some guidance or advice on what you feel is the best
> approach / idiomatic approach for this.
>
> All the best,
> Tom