Iterate several kafka topics using the kafka connector

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Iterate several kafka topics using the kafka connector

Hung
Hi Flink developers,

Can I ask how could we iterate several Kafka topics using the Kafka connector?

Our idea is like the following example:

List<DataStream<JSONObject>> streams = new ArrayList<>();

// Iterate kafka topics
Iterator<String> topicIter = topicList.iterator();

        while (topicIter.hasNext()){

            String topic = topicIter.next();

            streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
                    new JSONSchema(), properties)).rebalance());

        }

Our goal is to union several kafka data streams into one, given the topics as a list:

        Iterator<DataStream<JSONObject>> streamsIt = streams.iterator();

        DataStream<JSONObject> currentStream = streamsIt.next();
        while(streamsIt.hasNext()){
            DataStream<JSONObject> nextStream = streamsIt.next();
            currentStream = currentStream.union(nextStream);
        }

Cheers,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: Iterate several kafka topics using the kafka connector

Till Rohrmann

It is possible to instantiate the FlinkKafkaConsumer with multiple topics [1]. Simply pass a list of topic names instead of a the name of a single topic.

streams.add(env.addSource(new FlinkKafkaConsumer09<>(Arrays.asList("foo", "bar", "foobar"),
                    new JSONSchema(), properties));

[1] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumer

Cheers,
Till


On Thu, Jun 23, 2016 at 2:33 PM, Sendoh <[hidden email]> wrote:
Hi Flink developers,

Can I ask how could we iterate several Kafka topics using the Kafka
connector?

Our idea is like the following example:

List<DataStream&lt;JSONObject>> streams = new ArrayList<>();

// Iterate kafka topics
Iterator<String> topicIter = topicList.iterator();

        while (topicIter.hasNext()){

            String topic = topicIter.next();

            streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic,
                    new JSONSchema(), properties)).rebalance());

        }

Our goal is to union several kafka data streams into one, given the topics
as a list:

        Iterator<DataStream&lt;JSONObject>> streamsIt = streams.iterator();

        DataStream<JSONObject> currentStream = streamsIt.next();
        while(streamsIt.hasNext()){
            DataStream<JSONObject> nextStream = streamsIt.next();
            currentStream = currentStream.union(nextStream);
        }

Cheers,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Iterate several kafka topics using the kafka connector

Hung
Thank you. It totally works as what we want which unions data streams.

Best,

Sendoh