dynamic add sink to flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

dynamic add sink to flink

limbo
Hi:
    we use flink as a router to our kafka, we read from one kafka and to a lot of diffrent kafka and topic,
but it will cost too much time to start the flink job  if we want to add some other kafka sink to the flink, so
if there  any way to dynamic add sink to flink or just start the flink job faster? The reason of slow start flink
job I think is the lots of kafka sink.

    We now use the demo code like this,

<code>
splitStream = stream.split(by the content)
for ((k, v) <- map) {
       splitStream.select(k).addSink(new kafkaSink(v))
}
</code>






Reply | Threaded
Open this post in threaded view
|

Re: dynamic add sink to flink

Tzu-Li (Gordon) Tai
Hi,

The Flink Kafka Producer allows writing to multiple topics beside the default topic.
To do this, you can override the configured default topic by implementing the `getTargetTopic` method on the `KeyedSerializationSchema`.
That method is invoked for each record, and if a value is returned, the record will be routed to that topic instead of the default one.

Does this address what you have in mind?

Cheers,
Gordon

On 10 June 2017 at 6:20:59 AM, qq ([hidden email]) wrote:

Hi:
we use flink as a router to our kafka, we read from one kafka and to a lot of diffrent kafka and topic,
but it will cost too much time to start the flink job if we want to add some other kafka sink to the flink, so
if there any way to dynamic add sink to flink or just start the flink job faster? The reason of slow start flink
job I think is the lots of kafka sink.

We now use the demo code like this,

<code>
splitStream = stream.split(by the content)
for ((k, v) <- map) {
splitStream.select(k).addSink(new kafkaSink(v))
}
</code>