|
Hi Flink developers,
Can I ask how could we iterate several Kafka topics using the Kafka connector?
Our idea is like the following example:
List<DataStream<JSONObject>> streams = new ArrayList<>();
// Iterate kafka topics
Iterator<String> topicIter = topicList.iterator();
while (topicIter.hasNext()){
String topic = topicIter.next();
streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
new JSONSchema(), properties)).rebalance());
}
Our goal is to union several kafka data streams into one, given the topics as a list:
Iterator<DataStream<JSONObject>> streamsIt = streams.iterator();
DataStream<JSONObject> currentStream = streamsIt.next();
while(streamsIt.hasNext()){
DataStream<JSONObject> nextStream = streamsIt.next();
currentStream = currentStream.union(nextStream);
}
Cheers,
Sendoh
|