How to use multiple sources with multiple sinks

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use multiple sources with multiple sinks

Flink Developer
How can I configure 1 Flink Job (stream execution environment, parallelism set to 10) to have multiple kafka sources where each has its' own sink to s3.

For example, let's say the sources are:
  1. Kafka Topic A - Consumer (10 partitions)
  2. Kafka Topic B - Consumer (10 partitions)
  3. Kafka Topic C - Consumer (10 partitions)
And let's say the sinks are:
  1. BucketingSink to S3 in bucket: s3://kafka_topic_a/<data files>
  2. BucketingSink to S3 in bucket: s3://kafka_topic_b/<data files>
  3. BucketingSink to S3 in bucket: s3://kafka_topic_c/<data files>
And between source 1 to sink 1, I would like to perform unique processing. Between source 2 to sink 2, it should have unique processing and between source 3 to sink 3, it should also have unique processing.

How can this be achieved? Is there an example?
Reply | Threaded
Open this post in threaded view
|

Re: How to use multiple sources with multiple sinks

vino yang
Hi,

If you are expressing a job that contains three pairs of source->sinks that are isolated from each other, then Flink supports this form of Job. 
It is not much different from a single source->sink, just changed from a DataStream to three DataStreams.

For example,

DataStream ds1 = xxx
ds1.addSink();

DataStream ds2 = xxx
ds2.addSink();

DataStream ds3 = xxx
ds3.addSink();

Thanks, vino.

Flink Developer <[hidden email]> 于2018年11月11日周日 上午9:24写道:
How can I configure 1 Flink Job (stream execution environment, parallelism set to 10) to have multiple kafka sources where each has its' own sink to s3.

For example, let's say the sources are:
  1. Kafka Topic A - Consumer (10 partitions)
  2. Kafka Topic B - Consumer (10 partitions)
  3. Kafka Topic C - Consumer (10 partitions)
And let's say the sinks are:
  1. BucketingSink to S3 in bucket: s3://kafka_topic_a/<data files>
  2. BucketingSink to S3 in bucket: s3://kafka_topic_b/<data files>
  3. BucketingSink to S3 in bucket: s3://kafka_topic_c/<data files>
And between source 1 to sink 1, I would like to perform unique processing. Between source 2 to sink 2, it should have unique processing and between source 3 to sink 3, it should also have unique processing.

How can this be achieved? Is there an example?