Flink Kafka producer with a topic per message

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka producer with a topic per message

Sanne de Roever
Hi,

Kafka producer clients for 0.10 allow the following syntax:

producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

The gist is that one producer can send messages to different topics; it is useful for event routing ao. It makes the creation generic endpoints easier. If I am right, Flink currently does not support this; would this be a useful addition?

Cheers,

Sanne
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka producer with a topic per message

Stephan Ewen
You are right, it does not exist, and it would be a nice addition.

Can you sketch some details on how to do that?

  - Will it be a new type of producer? If yes, can as much as possible of the code be shared between the current and the new producer?
  - Will it only be part of the Flink Kafka 0.10 producer?

Thanks,
Stephan



On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <[hidden email]> wrote:
Hi,

Kafka producer clients for 0.10 allow the following syntax:

producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

The gist is that one producer can send messages to different topics; it is useful for event routing ao. It makes the creation generic endpoints easier. If I am right, Flink currently does not support this; would this be a useful addition?

Cheers,

Sanne

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka producer with a topic per message

Sanne de Roever
Good questions, I will follow up piece-wise to address the different questions. Could a Wiki section be an idea, before I spread the information across several posts?

On Tue, Dec 6, 2016 at 4:50 PM, Stephan Ewen <[hidden email]> wrote:
You are right, it does not exist, and it would be a nice addition.

Can you sketch some details on how to do that?

  - Will it be a new type of producer? If yes, can as much as possible of the code be shared between the current and the new producer?
  - Will it only be part of the Flink Kafka 0.10 producer?

Thanks,
Stephan



On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <[hidden email]> wrote:
Hi,

Kafka producer clients for 0.10 allow the following syntax:

producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

The gist is that one producer can send messages to different topics; it is useful for event routing ao. It makes the creation generic endpoints easier. If I am right, Flink currently does not support this; would this be a useful addition?

Cheers,

Sanne


Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka producer with a topic per message

Sanne de Roever
A first sketch

Central to this functionality is Kafka's ProducerRecord. ProducerRecord was introduced for Kafka 0.8. This means that any functionality could be introduced for all Flink-Kafka connectors; as per https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/connectors/kafka.html
ProducerRecord does two things:
  • It allows a Kafka producer to send messages to different topics in Kafka; this can be very helpful for message routing (I can make a more formal example later)
  • It also allows to create a key that determines the partition of the message; introducing this would give Flink a more generic interface to Kafka, which is a good thing.
  • A partition can be identified by an integer or a key String that will be hashed
The next step would be to determine the impact on the interface of a Sink. Currently a Kafka sink has one topic, for example:

.addSink(new FlinkKafkaProducer09[String](outputTopic, new SimpleStringSchema(), producerProps))

In the new scenario one would like to pass not only the message to be sent, but also a topic string and a partition id or key (tuple-ish?). The next suggestion is just to start the thinking a bit; a shot in the dark. As somewhat blunt approach would be to map all messages to a valid ProducerRecord, and then to pass this ProducerRecord to the the Sink, and the rest is history. No attempt at abstractions are made, the reasoning being as follows.

Evaluating I see the following. The current KafkaSink abstracts the Kafka functionality out on the Flink side. This is a good thing, and will work for most cases. Providing a tighter integration with Kafka will probably break down the abstraction. This seems to point into the direction of creating an advanced Kafka Sink. This sink gives more control, but less abstraction; it is for advanced applications. Any abstraction attempts will only create less transparency as far as I can see. The contract would not likely work on other queuing providers.



On Wed, Dec 7, 2016 at 10:27 AM, Sanne de Roever <[hidden email]> wrote:
Good questions, I will follow up piece-wise to address the different questions. Could a Wiki section be an idea, before I spread the information across several posts?

On Tue, Dec 6, 2016 at 4:50 PM, Stephan Ewen <[hidden email]> wrote:
You are right, it does not exist, and it would be a nice addition.

Can you sketch some details on how to do that?

  - Will it be a new type of producer? If yes, can as much as possible of the code be shared between the current and the new producer?
  - Will it only be part of the Flink Kafka 0.10 producer?

Thanks,
Stephan



On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <[hidden email]> wrote:
Hi,

Kafka producer clients for 0.10 allow the following syntax:

producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

The gist is that one producer can send messages to different topics; it is useful for event routing ao. It makes the creation generic endpoints easier. If I am right, Flink currently does not support this; would this be a useful addition?

Cheers,

Sanne



Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka producer with a topic per message

Sanne de Roever
Having a had a glass of water, the following option came up. 

Having more advanced Sink integrations is likely to be a more general concern. It would be better to have a more smooth path from the cleaner abstraction to the advanced case. A more general proposal would be to alter the Sink interface such that with each message optionally a key-value map can be passed. This optional key-value map would allow the sink to alter its behavior given the hints in the map.

On Wed, Dec 7, 2016 at 10:55 AM, Sanne de Roever <[hidden email]> wrote:
A first sketch

Central to this functionality is Kafka's ProducerRecord. ProducerRecord was introduced for Kafka 0.8. This means that any functionality could be introduced for all Flink-Kafka connectors; as per https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/connectors/kafka.html
ProducerRecord does two things:
  • It allows a Kafka producer to send messages to different topics in Kafka; this can be very helpful for message routing (I can make a more formal example later)
  • It also allows to create a key that determines the partition of the message; introducing this would give Flink a more generic interface to Kafka, which is a good thing.
  • A partition can be identified by an integer or a key String that will be hashed
The next step would be to determine the impact on the interface of a Sink. Currently a Kafka sink has one topic, for example:

.addSink(new FlinkKafkaProducer09[String](outputTopic, new SimpleStringSchema(), producerProps))

In the new scenario one would like to pass not only the message to be sent, but also a topic string and a partition id or key (tuple-ish?). The next suggestion is just to start the thinking a bit; a shot in the dark. As somewhat blunt approach would be to map all messages to a valid ProducerRecord, and then to pass this ProducerRecord to the the Sink, and the rest is history. No attempt at abstractions are made, the reasoning being as follows.

Evaluating I see the following. The current KafkaSink abstracts the Kafka functionality out on the Flink side. This is a good thing, and will work for most cases. Providing a tighter integration with Kafka will probably break down the abstraction. This seems to point into the direction of creating an advanced Kafka Sink. This sink gives more control, but less abstraction; it is for advanced applications. Any abstraction attempts will only create less transparency as far as I can see. The contract would not likely work on other queuing providers.



On Wed, Dec 7, 2016 at 10:27 AM, Sanne de Roever <[hidden email]> wrote:
Good questions, I will follow up piece-wise to address the different questions. Could a Wiki section be an idea, before I spread the information across several posts?

On Tue, Dec 6, 2016 at 4:50 PM, Stephan Ewen <[hidden email]> wrote:
You are right, it does not exist, and it would be a nice addition.

Can you sketch some details on how to do that?

  - Will it be a new type of producer? If yes, can as much as possible of the code be shared between the current and the new producer?
  - Will it only be part of the Flink Kafka 0.10 producer?

Thanks,
Stephan



On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <[hidden email]> wrote:
Hi,

Kafka producer clients for 0.10 allow the following syntax:

producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

The gist is that one producer can send messages to different topics; it is useful for event routing ao. It makes the creation generic endpoints easier. If I am right, Flink currently does not support this; would this be a useful addition?

Cheers,

Sanne




Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka producer with a topic per message

Tzu-Li (Gordon) Tai
In reply to this post by Sanne de Roever
Hi,

The FlinkKafkaProducers currently support per record topic through the
user-provided serialization schema which has a "getTargetTopic(T element)" method called for per record,
and also decides the partition the record will be sent to through a custom KafkaPartitioner, which is also provided 
by the user when creating a FlinkKafkaProducer.

Does this already provide the functionality you’ve mentioned? Or have I misunderstood what you have in mind?

Cheers,
Gordon


On December 7, 2016 at 5:55:24 PM, Sanne de Roever ([hidden email]) wrote:

The next step would be to determine the impact on the interface of a Sink. Currently a Kafka sink has one topic, for example:

Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka producer with a topic per message

Sanne de Roever
Hi Gordon,

Sounds very close, I will have look; thx.

Cheers,

Sanne

On Wed, Dec 7, 2016 at 11:09 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

The FlinkKafkaProducers currently support per record topic through the
user-provided serialization schema which has a "getTargetTopic(T element)" method called for per record,
and also decides the partition the record will be sent to through a custom KafkaPartitioner, which is also provided 
by the user when creating a FlinkKafkaProducer.

Does this already provide the functionality you’ve mentioned? Or have I misunderstood what you have in mind?

Cheers,
Gordon


On December 7, 2016 at 5:55:24 PM, Sanne de Roever ([hidden email]) wrote:

The next step would be to determine the impact on the interface of a Sink. Currently a Kafka sink has one topic, for example:


Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka producer with a topic per message

Sanne de Roever
Hi Gordon,

Yes, this has been addressed in 1.0.0; and in a very nice way. Thank you.

</CloseThread>

Cheers,

Sanne

On Wed, Dec 7, 2016 at 11:11 AM, Sanne de Roever <[hidden email]> wrote:
Hi Gordon,

Sounds very close, I will have look; thx.

Cheers,

Sanne

On Wed, Dec 7, 2016 at 11:09 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

The FlinkKafkaProducers currently support per record topic through the
user-provided serialization schema which has a "getTargetTopic(T element)" method called for per record,
and also decides the partition the record will be sent to through a custom KafkaPartitioner, which is also provided 
by the user when creating a FlinkKafkaProducer.

Does this already provide the functionality you’ve mentioned? Or have I misunderstood what you have in mind?

Cheers,
Gordon


On December 7, 2016 at 5:55:24 PM, Sanne de Roever ([hidden email]) wrote:

The next step would be to determine the impact on the interface of a Sink. Currently a Kafka sink has one topic, for example: