Kafka and parallelism

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka and parallelism

Christophe Jolif
Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe
Reply | Threaded
Open this post in threaded view
|

Re: Kafka and parallelism

Tzu-Li (Gordon) Tai
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

On 3 February 2018 at 6:53:47 PM, Christophe Jolif ([hidden email]) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe
Reply | Threaded
Open this post in threaded view
|

Re: Kafka and parallelism

Christophe Jolif
Thanks. It helps indeed.

I guess the last point it does not explicitly answer is "does just creating a kafka consumer reading from multiple partition set the parallelism to the number of partitions". But reading between the lines I think this answer is clearly no. You have to set your parallelism yourself and then it will round robin between them.

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon


On 3 February 2018 at 6:53:47 PM, Christophe Jolif ([hidden email]) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe



Reply | Threaded
Open this post in threaded view
|

Re: Kafka and parallelism

Tzu-Li (Gordon) Tai
Yes, the answer to that would be no.

If you do not explicitly set a parallelism for the consumer, the parallelism by default will be whatever the parallelism of the job is, and is independent of how many Kafka partitions there are.

Cheers,
Gordon

On 5 February 2018 at 11:42:21 AM, Christophe Jolif ([hidden email]) wrote:

Thanks. It helps indeed.

I guess the last point it does not explicitly answer is "does just creating a kafka consumer reading from multiple partition set the parallelism to the number of partitions". But reading between the lines I think this answer is clearly no. You have to set your parallelism yourself and then it will round robin between them.

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon


On 3 February 2018 at 6:53:47 PM, Christophe Jolif ([hidden email]) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe



Reply | Threaded
Open this post in threaded view
|

Re: Kafka and parallelism

Christophe Jolif
In reply to this post by Tzu-Li (Gordon) Tai
Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern. 

I want to then to sink the result of the processing in a set of topics depending on from where the original message came from (i.e. if this comes from origin-topic-1 I will serialize the result in destination-topic-1, if from topic-2 to topic-2 etc...). However the KafkaProducer is working on a fixed topic. You can provide a partitioning function (FlinkKafkaPartitioner) but not a "topic" function that would allow to decide to witch topic sending the message a bit like a BucketingSink would decide the bucket or ElasticsearchSinkFunction allows you to choose the index. 

Am I missing something? The reason I'm asking is that some of the sink ctor are talking about "defaultTopicId" and some about "topicId" just like if in some case there was some ability to override the topic. Is there there a feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon


On 3 February 2018 at 6:53:47 PM, Christophe Jolif ([hidden email]) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe



--
Christophe
Reply | Threaded
Open this post in threaded view
|

Re: Kafka and parallelism

Tzu-Li (Gordon) Tai
Hi Christophe,

Yes, you can achieve writing to different topics per-message using the `KeyedSerializationSchema` provided to the Kafka producer.
The schema interface has a `getTargetTopic` method which allows you to override the default target topic for a given record.
I agree that the method is somewhat odd to be part of the serialization schema, so I have also been thinking about moving that elsewhere (maybe as part of the partitioner).

If you want to route a record to some topic depending on which topic it came from on the consumer side, you’ll have to wrap the source topic information within the records so that it is available to the producer.
You can access that in the `KeyedDeserializationSchema#deserialize` method, which exposes information about which topic and partition each record came from.

Cheers,
Gordon

On 7 February 2018 at 9:40:50 AM, Christophe Jolif ([hidden email]) wrote:

Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern. 

I want to then to sink the result of the processing in a set of topics depending on from where the original message came from (i.e. if this comes from origin-topic-1 I will serialize the result in destination-topic-1, if from topic-2 to topic-2 etc...). However the KafkaProducer is working on a fixed topic. You can provide a partitioning function (FlinkKafkaPartitioner) but not a "topic" function that would allow to decide to witch topic sending the message a bit like a BucketingSink would decide the bucket or ElasticsearchSinkFunction allows you to choose the index. 

Am I missing something? The reason I'm asking is that some of the sink ctor are talking about "defaultTopicId" and some about "topicId" just like if in some case there was some ability to override the topic. Is there there a feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon


On 3 February 2018 at 6:53:47 PM, Christophe Jolif ([hidden email]) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe



--
Christophe
Reply | Threaded
Open this post in threaded view
|

Re: Kafka and parallelism

Christophe Jolif
Ok thanks! I should have seen this. Sorry.

--
Christophe

On Wed, Feb 7, 2018 at 10:27 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Christophe,

Yes, you can achieve writing to different topics per-message using the `KeyedSerializationSchema` provided to the Kafka producer.
The schema interface has a `getTargetTopic` method which allows you to override the default target topic for a given record.
I agree that the method is somewhat odd to be part of the serialization schema, so I have also been thinking about moving that elsewhere (maybe as part of the partitioner).

If you want to route a record to some topic depending on which topic it came from on the consumer side, you’ll have to wrap the source topic information within the records so that it is available to the producer.
You can access that in the `KeyedDeserializationSchema#deserialize` method, which exposes information about which topic and partition each record came from.

Cheers,
Gordon

On 7 February 2018 at 9:40:50 AM, Christophe Jolif ([hidden email]) wrote:

Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern. 

I want to then to sink the result of the processing in a set of topics depending on from where the original message came from (i.e. if this comes from origin-topic-1 I will serialize the result in destination-topic-1, if from topic-2 to topic-2 etc...). However the KafkaProducer is working on a fixed topic. You can provide a partitioning function (FlinkKafkaPartitioner) but not a "topic" function that would allow to decide to witch topic sending the message a bit like a BucketingSink would decide the bucket or ElasticsearchSinkFunction allows you to choose the index. 

Am I missing something? The reason I'm asking is that some of the sink ctor are talking about "defaultTopicId" and some about "topicId" just like if in some case there was some ability to override the topic. Is there there a feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon


On 3 February 2018 at 6:53:47 PM, Christophe Jolif ([hidden email]) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe



--
Christophe



--
Christophe