Producing to Kafka topics dynamically without redeployment

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Producing to Kafka topics dynamically without redeployment

Ahmed A.Hamid
Hello everyone,

I have a use-case where I need to have a Flink application produce to a variable number of Kafka topics (specified through configuration), potentially in different clusters, without having to redeploy the app. Let's assume I maintain the set of destination clusters/topics in config files, and have code in my Flink app to detect and reload any changes in these config files at runtime.

I have two questions:
  1. Is that a sound/reasonable thing to do? Or is it going to be riddled with issues?

  2. To implement that, should I write a custom SinkFunction that maintains a set of Kafka producers? Or a custom SinkFunction that delegates the work to a collection of FlinkKafkaProducer instances? Is there a better approach?
Thanks in advance.

Truly,
Ahmed

Reply | Threaded
Open this post in threaded view
|

Re: Producing to Kafka topics dynamically without redeployment

Ejaskhan S
Hi Ahmed,

If you want to dynamically produce events to different topics and you have the logic to identify the target topics,  you will be able to achieve this in the following way.

  • Suppose this is your event after the transformation logic(if any) :  EVENT.
  • This is the target topic for this event, TOPIC_1.  ( I hope,  you have the logic available to identify the topic dynamically)

  • Create a new dataStream(custom DS) containing the folllowing attributes, topicName and event.

class TransformedEvent  implements java.io.Serializable  {
String topicName;
Event event;
}

  • Create the serialization schema for the topic as below,

class CustomKafkaSchema implements KafkaSerializationSchema<TransformedEvent>,
        KafkaContextAware<TransformedEvent> {

    @Override
    public ProducerRecord<byte[], byte[]> serialize(TransformedEvent element, @Nullable Long timestamp) {
        byte[] serialized = new customSerliazer().serialize(element.getEvent());
        return new ProducerRecord<>(getTargetTopic(element),
                null, null, null, serialized);
    }

    @Override
    public String getTargetTopic(TransformedEvent element) {
        return element.getTopicName();
    }

}

  • Create the producer as below,

FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>(
                    "DEFAULT",
                    new CustomKafkaSchema(), 
                   producerConfiguration ,
                   FlinkKafkaProducer.Semantic.EXACTLY_ONCE/ AT_LEAST_ONCE);



Thanks
Ejas khan



On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <[hidden email]> wrote:
Hello everyone,

I have a use-case where I need to have a Flink application produce to a variable number of Kafka topics (specified through configuration), potentially in different clusters, without having to redeploy the app. Let's assume I maintain the set of destination clusters/topics in config files, and have code in my Flink app to detect and reload any changes in these config files at runtime.

I have two questions:
  1. Is that a sound/reasonable thing to do? Or is it going to be riddled with issues?

  2. To implement that, should I write a custom SinkFunction that maintains a set of Kafka producers? Or a custom SinkFunction that delegates the work to a collection of FlinkKafkaProducer instances? Is there a better approach?
Thanks in advance.

Truly,
Ahmed

Reply | Threaded
Open this post in threaded view
|

Re: Producing to Kafka topics dynamically without redeployment

Ahmed A.Hamid
Thank you, Ejaskhan.

I think your suggestion would only work if all the topics were on the same Kafka cluster. In my use-case, the topics can be on different clusters, which is why I was thinking of rolling a custom sink that detects config changes and instantiates Kafka producers on demand as needed.


On Tuesday, April 20, 2021, 01:11:08 PM PDT, Ejaskhan S <[hidden email]> wrote:


Hi Ahmed,

If you want to dynamically produce events to different topics and you have the logic to identify the target topics,  you will be able to achieve this in the following way.

  • Suppose this is your event after the transformation logic(if any) :  EVENT.
  • This is the target topic for this event, TOPIC_1.  ( I hope,  you have the logic available to identify the topic dynamically)

  • Create a new dataStream(custom DS) containing the folllowing attributes, topicName and event.

class TransformedEvent  implements java.io.Serializable  {
String topicName;
Event event;
}

  • Create the serialization schema for the topic as below,

class CustomKafkaSchema implements KafkaSerializationSchema<TransformedEvent>,
        KafkaContextAware<TransformedEvent> {

    @Override
    public ProducerRecord<byte[], byte[]> serialize(TransformedEvent element, @Nullable Long timestamp) {
        byte[] serialized = new customSerliazer().serialize(element.getEvent());
        return new ProducerRecord<>(getTargetTopic(element),
                null, null, null, serialized);
    }

    @Override
    public String getTargetTopic(TransformedEvent element) {
        return element.getTopicName();
    }

}

  • Create the producer as below,

FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>(
                    "DEFAULT",
                    new CustomKafkaSchema(), 
                   producerConfiguration ,
                   FlinkKafkaProducer.Semantic.EXACTLY_ONCE/ AT_LEAST_ONCE);



Thanks
Ejas khan



On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <[hidden email]> wrote:
Hello everyone,

I have a use-case where I need to have a Flink application produce to a variable number of Kafka topics (specified through configuration), potentially in different clusters, without having to redeploy the app. Let's assume I maintain the set of destination clusters/topics in config files, and have code in my Flink app to detect and reload any changes in these config files at runtime.

I have two questions:
  1. Is that a sound/reasonable thing to do? Or is it going to be riddled with issues?

  2. To implement that, should I write a custom SinkFunction that maintains a set of Kafka producers? Or a custom SinkFunction that delegates the work to a collection of FlinkKafkaProducer instances? Is there a better approach?
Thanks in advance.

Truly,
Ahmed

Reply | Threaded
Open this post in threaded view
|

Re: Producing to Kafka topics dynamically without redeployment

Ejaskhan S
Hi Ahmed,

If you have the logic to identify the destination cluster along with the target topic, you will be able to achieve this with the above solution.

1. Create one kafka producer for each cluster. If 10 clusters are there, create 10 producers.

2. Add a new attribute called 'clusterId' or something more meaningful for identifying the cluster inside TransformedEvent.

3. Filter the Datastream<TransformedEvent> based on the clusterId. And add the corresponding cluster related producer into the filtered stream.


Thanks
Ejaskhan

On Wed, Apr 21, 2021, 1:49 AM Ahmed A.Hamid <[hidden email]> wrote:
Thank you, Ejaskhan.

I think your suggestion would only work if all the topics were on the same Kafka cluster. In my use-case, the topics can be on different clusters, which is why I was thinking of rolling a custom sink that detects config changes and instantiates Kafka producers on demand as needed.


On Tuesday, April 20, 2021, 01:11:08 PM PDT, Ejaskhan S <[hidden email]> wrote:


Hi Ahmed,

If you want to dynamically produce events to different topics and you have the logic to identify the target topics,  you will be able to achieve this in the following way.

  • Suppose this is your event after the transformation logic(if any) :  EVENT.
  • This is the target topic for this event, TOPIC_1.  ( I hope,  you have the logic available to identify the topic dynamically)

  • Create a new dataStream(custom DS) containing the folllowing attributes, topicName and event.

class TransformedEvent  implements java.io.Serializable  {
String topicName;
Event event;
}

  • Create the serialization schema for the topic as below,

class CustomKafkaSchema implements KafkaSerializationSchema<TransformedEvent>,
        KafkaContextAware<TransformedEvent> {

    @Override
    public ProducerRecord<byte[], byte[]> serialize(TransformedEvent element, @Nullable Long timestamp) {
        byte[] serialized = new customSerliazer().serialize(element.getEvent());
        return new ProducerRecord<>(getTargetTopic(element),
                null, null, null, serialized);
    }

    @Override
    public String getTargetTopic(TransformedEvent element) {
        return element.getTopicName();
    }

}

  • Create the producer as below,

FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>(
                    "DEFAULT",
                    new CustomKafkaSchema(), 
                   producerConfiguration ,
                   FlinkKafkaProducer.Semantic.EXACTLY_ONCE/ AT_LEAST_ONCE);



Thanks
Ejas khan



On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <[hidden email]> wrote:
Hello everyone,

I have a use-case where I need to have a Flink application produce to a variable number of Kafka topics (specified through configuration), potentially in different clusters, without having to redeploy the app. Let's assume I maintain the set of destination clusters/topics in config files, and have code in my Flink app to detect and reload any changes in these config files at runtime.

I have two questions:
  1. Is that a sound/reasonable thing to do? Or is it going to be riddled with issues?

  2. To implement that, should I write a custom SinkFunction that maintains a set of Kafka producers? Or a custom SinkFunction that delegates the work to a collection of FlinkKafkaProducer instances? Is there a better approach?
Thanks in advance.

Truly,
Ahmed