when using FlinkKafkaConsumer010 to subscribing multiple topics as List<String> topics = Arrays.asList("test1","test2"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(topics, new SimpleStringSchema(), properties)); How do I get topic names in my SinkFunction? i.e. stream.addSink() Thanks, Richard |
Hi Richard, Producing to multiple topics is treated a bit differently in the Flink Kafka producer. You need to set a single default target topic, and in `KeyedSerializationSchema#getTargetTopic()` you can override the default topic with whatever is returned. The `getTargetTopic` method is invoked for each record. Cheers, Gordon On 6 July 2017 at 9:09:29 AM, Richard Xin ([hidden email]) wrote:
|
Thanks, I'm not sure I understand this, what I need is to have single a process subscribing multiple kafka topics, and have a switch clause for different topics in my SinkFunction, did you I need to change the way how the kafka producer to produce the message? Any pointer to code samples will be appreciated. Thanks Again Richard On Wednesday, July 5, 2017, 10:25:59 PM PDT, Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi Richard, Producing to multiple topics is treated a bit differently in the Flink Kafka producer. You need to set a single default target topic, and in `KeyedSerializationSchema#getTargetTopic()` you can override the default topic with whatever is returned. The `getTargetTopic` method is invoked for each record. Cheers, Gordon On 6 July 2017 at 9:09:29 AM, Richard Xin ([hidden email]) wrote: when using
FlinkKafkaConsumer010 to subscribing multiple topics as
List<String> topics = Arrays.asList("test1","test2"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(topics, new SimpleStringSchema(), properties));
How do I get topic names in my
SinkFunction? i.e. stream.addSink()
Thanks,
Richard
|
Hi, Here’s an example: DataStream<String> inputStream = …; inputStream.addSink(new FlinkKafkaProducer09<>( “defaultTopic”, new CustomKeyedSerializationSchema(), props)); Code for CustomKeyedSerializationSchema: public class CustomKeyedSerializationSchema implements KeyedDeserializationSchema<String> { byte[] getKeyBytes(…) {…} byte[] getValueBytes(…) {…} String getTargetTopic() { ... } } However, for each record, `getTargetTopic` will also be called. Whatever is returned from there will override “defaultTopic”. You can place your switch there. Cheers, Gordon
On 6 July 2017 at 11:43:55 PM, Richard Xin ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |