Hi Ben,
if I remember correctly, this topic came up a couple of times. But we
haven't implemented it yet, the existing implementation can be easily
adapted for that. The "target topic" would be an additional persisted
metadata column in SQL terms. All you need to do is to adapt
DynamicKafkaSerializationSchema
KafkaDynamicSink
for that.
I opened
https://issues.apache.org/jira/browse/FLINK-22748 to discuss
this further.
I hope this helps.
Regards,
Timo
On 20.05.21 12:43, Benoît Paris wrote:
> Hi all!
>
> I'm looking for a way to write to different Kafka topics based on some
> column value in SQL.
>
> I think it's possible with Java, using KafkaSerializationSchema,
> and ProducerRecord(topic, ...), but I was wondering if I could somewhat
> access that feature in SQL.
>
> I'm also trying to evaluate the amount of work required so that I
> implement it myself, subclassing the Kafka SQL connector just to add
> that feature.
>
> Another alternative for me is to try to preprocess the SQL, detect Kafka
> Sinks, force a DataStream conversion, then replace the Kafka SQL sink
> with an equivalent DataStream that has the topic routing. (but this
> feels rather brittle and maintenance-hard to me, rather than having the
> option in the SQL sink configuration)
>
> All comments/opinions/advice welcome!
> Cheers
> Ben
>
> ||
>
>