how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

Richard Xin
when using FlinkKafkaConsumer010 to subscribing multiple topics as 

List<String> topics = Arrays.asList("test1","test2");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(topicsnew SimpleStringSchema(), properties));

How do I get topic names in my SinkFunction? i.e. stream.addSink()

Thanks,
Richard
Reply | Threaded
Open this post in threaded view
|

Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

Tzu-Li (Gordon) Tai
Hi Richard,

Producing to multiple topics is treated a bit differently in the Flink Kafka producer.
You need to set a single default target topic, and in `KeyedSerializationSchema#getTargetTopic()` you can override the default topic with whatever is returned. The `getTargetTopic` method is invoked for each record.

Cheers,
Gordon


On 6 July 2017 at 9:09:29 AM, Richard Xin ([hidden email]) wrote:

when using FlinkKafkaConsumer010 to subscribing multiple topics as 

List<String> topics = Arrays.asList("test1","test2");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(topicsnew SimpleStringSchema(), properties));

How do I get topic names in my SinkFunction? i.e. stream.addSink()

Thanks,
Richard
Reply | Threaded
Open this post in threaded view
|

Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

Richard Xin
Thanks,
I'm not sure I understand this, what I need is to have single a process subscribing multiple kafka topics, and have a switch clause for different topics in my SinkFunction, did you I need to change the way how the kafka producer to produce the message? 
Any pointer to code samples will be appreciated. 
Thanks Again
Richard


On Wednesday, July 5, 2017, 10:25:59 PM PDT, Tzu-Li (Gordon) Tai <[hidden email]> wrote:


Hi Richard,

Producing to multiple topics is treated a bit differently in the Flink Kafka producer.
You need to set a single default target topic, and in `KeyedSerializationSchema#getTargetTopic()` you can override the default topic with whatever is returned. The `getTargetTopic` method is invoked for each record.

Cheers,
Gordon


On 6 July 2017 at 9:09:29 AM, Richard Xin ([hidden email]) wrote:

when using FlinkKafkaConsumer010 to subscribing multiple topics as 

List<String> topics = Arrays.asList("test1","test2");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(topicsnew SimpleStringSchema(), properties));

How do I get topic names in my SinkFunction? i.e. stream.addSink()

Thanks,
Richard
Reply | Threaded
Open this post in threaded view
|

Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

Tzu-Li (Gordon) Tai
Hi,

Here’s an example:

DataStream<String> inputStream = …;

inputStream.addSink(new FlinkKafkaProducer09<>(
    “defaultTopic”, new CustomKeyedSerializationSchema(), props));

Code for CustomKeyedSerializationSchema:
public class CustomKeyedSerializationSchema implements KeyedDeserializationSchema<String> {
    byte[] getKeyBytes(…) {…}
    byte[] getValueBytes(…) {…}
    String getTargetTopic() {
      ...
    }
}

For the above code, by default records will always be sent to the “defaultTopic” topic.
However, for each record, `getTargetTopic` will also be called. Whatever is returned from there will override “defaultTopic”.
You can place your switch there.

Cheers,
Gordon

On 6 July 2017 at 11:43:55 PM, Richard Xin ([hidden email]) wrote:

Thanks,
I'm not sure I understand this, what I need is to have single a process subscribing multiple kafka topics, and have a switch clause for different topics in my SinkFunction, did you I need to change the way how the kafka producer to produce the message? 
Any pointer to code samples will be appreciated. 
Thanks Again
Richard


On Wednesday, July 5, 2017, 10:25:59 PM PDT, Tzu-Li (Gordon) Tai <[hidden email]> wrote:


Hi Richard,

Producing to multiple topics is treated a bit differently in the Flink Kafka producer.
You need to set a single default target topic, and in `KeyedSerializationSchema#getTargetTopic()` you can override the default topic with whatever is returned. The `getTargetTopic` method is invoked for each record.

Cheers,
Gordon


On 6 July 2017 at 9:09:29 AM, Richard Xin ([hidden email]) wrote:

when using FlinkKafkaConsumer010 to subscribing multiple topics as 

List<String> topics = Arrays.asList("test1","test2");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(topicsnew SimpleStringSchema(), properties));

How do I get topic names in my SinkFunction? i.e. stream.addSink()

Thanks,
Richard