Kafka dynamic topic for Sink in SQL

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka dynamic topic for Sink in SQL

Benoît Paris
Hi all!

I'm looking for a way to write to different Kafka topics based on some column value in SQL.

I think it's possible with Java, using KafkaSerializationSchema, and ProducerRecord(topic, ...), but I was wondering if I could somewhat access that feature in SQL.

I'm also trying to evaluate the amount of work required so that I implement it myself, subclassing the Kafka SQL connector just to add that feature.

Another alternative for me is to try to preprocess the SQL, detect Kafka Sinks, force a DataStream conversion, then replace the Kafka SQL sink with an equivalent DataStream that has the topic routing. (but this feels rather brittle and maintenance-hard to me, rather than having the option in the SQL sink configuration)

All comments/opinions/advice welcome!
Cheers
Ben


Reply | Threaded
Open this post in threaded view
|

Re: Kafka dynamic topic for Sink in SQL

Timo Walther
Hi Ben,

if I remember correctly, this topic came up a couple of times. But we
haven't implemented it yet, the existing implementation can be easily
adapted for that. The "target topic" would be an additional persisted
metadata column in SQL terms. All you need to do is to adapt

DynamicKafkaSerializationSchema

KafkaDynamicSink

for that.

I opened https://issues.apache.org/jira/browse/FLINK-22748 to discuss
this further.

I hope this helps.

Regards,
Timo


On 20.05.21 12:43, Benoît Paris wrote:

> Hi all!
>
> I'm looking for a way to write to different Kafka topics based on some
> column value in SQL.
>
> I think it's possible with Java, using KafkaSerializationSchema,
> and ProducerRecord(topic, ...), but I was wondering if I could somewhat
> access that feature in SQL.
>
> I'm also trying to evaluate the amount of work required so that I
> implement it myself, subclassing the Kafka SQL connector just to add
> that feature.
>
> Another alternative for me is to try to preprocess the SQL, detect Kafka
> Sinks, force a DataStream conversion, then replace the Kafka SQL sink
> with an equivalent DataStream that has the topic routing. (but this
> feels rather brittle and maintenance-hard to me, rather than having the
> option in the SQL sink configuration)
>
> All comments/opinions/advice welcome!
> Cheers
> Ben
>
> ||
>
>