FlinkKafkaProducerXX

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaProducerXX

Mikhail Pryakhin-2
Hi all,
I've just come across a FlinkKafkaProducer misconfiguration issue especially when a FlinkKafkaProducer is created without specifying a kafka partitioner 
then a FlinkFixedPartitioner instance is used, and all messages end up in a single kafka partition (in case I have a single task manager instance). Just curious if it was done on purpose? 
Don't you consider a round robin strategy more suitable for this case? For now if I need a round-robin strategy I have to explicitly pass null value for it

Thanks in advance.

Kind Regards,
Mike Pryakhin










smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducerXX

Tzu-Li (Gordon) Tai
Hi Mike,

The rationale behind implementing the FlinkFixedPartitioner as the default
is so that each Flink sink partition (i.e. one sink parallel subtask) maps
to a single Kafka partition.

One other thing to clarify:
By setting the partitioner to null, the partitioning is based on a hash of
the record's attached key (the key retrieved from the
`SerializationSchema`), not round-robin.
To use round-robin partitioning, a custom partitioner should be provided.
Note however, a round-robin partitioner will create a lot of network
connections to all Kafka brokers on all Flink sink parallel subtasks, which
can be quite a lot.

To conclude this, I think the appropriate partitioning scheme depends on the
actual case.
For example, for some simple Flink job that only does some filtering of data
and has no aggregation within the pipeline, the key hash based partitioning
would probably be more ideal.
For more complex pipelines that partitions the computation by key already,
it could make sense that a direct mapping of a Flink sink partition to Kafka
partition would do.

On the other hand, considering that the key for each record is always
"re-calculated" by the `SerializationSchema` in each Flink Kafka Producer
sink partition, it might make sense to actually make the key hash
partitioner as the default instead.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducerXX

Stephan Ewen
Sounds like adding a round robin partitioner to the set of readily available partitioners would make sense.

On Fri, Dec 1, 2017 at 5:16 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Mike,

The rationale behind implementing the FlinkFixedPartitioner as the default
is so that each Flink sink partition (i.e. one sink parallel subtask) maps
to a single Kafka partition.

One other thing to clarify:
By setting the partitioner to null, the partitioning is based on a hash of
the record's attached key (the key retrieved from the
`SerializationSchema`), not round-robin.
To use round-robin partitioning, a custom partitioner should be provided.
Note however, a round-robin partitioner will create a lot of network
connections to all Kafka brokers on all Flink sink parallel subtasks, which
can be quite a lot.

To conclude this, I think the appropriate partitioning scheme depends on the
actual case.
For example, for some simple Flink job that only does some filtering of data
and has no aggregation within the pipeline, the key hash based partitioning
would probably be more ideal.
For more complex pipelines that partitions the computation by key already,
it could make sense that a direct mapping of a Flink sink partition to Kafka
partition would do.

On the other hand, considering that the key for each record is always
"re-calculated" by the `SerializationSchema` in each Flink Kafka Producer
sink partition, it might make sense to actually make the key hash
partitioner as the default instead.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducerXX

Mikhail Pryakhin-2
Exactly, at least it's worth mentioning the partitioner used by default in case none was specified in the javadoc, because the default behavior might not seem obvious.


Kind Regards,
Mike Pryakhin


On 3 Dec 2017, at 22:08, Stephan Ewen <[hidden email]> wrote:

Sounds like adding a round robin partitioner to the set of readily available partitioners would make sense.

On Fri, Dec 1, 2017 at 5:16 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Mike,

The rationale behind implementing the FlinkFixedPartitioner as the default
is so that each Flink sink partition (i.e. one sink parallel subtask) maps
to a single Kafka partition.

One other thing to clarify:
By setting the partitioner to null, the partitioning is based on a hash of
the record's attached key (the key retrieved from the
`SerializationSchema`), not round-robin.
To use round-robin partitioning, a custom partitioner should be provided.
Note however, a round-robin partitioner will create a lot of network
connections to all Kafka brokers on all Flink sink parallel subtasks, which
can be quite a lot.

To conclude this, I think the appropriate partitioning scheme depends on the
actual case.
For example, for some simple Flink job that only does some filtering of data
and has no aggregation within the pipeline, the key hash based partitioning
would probably be more ideal.
For more complex pipelines that partitions the computation by key already,
it could make sense that a direct mapping of a Flink sink partition to Kafka
partition would do.

On the other hand, considering that the key for each record is always
"re-calculated" by the `SerializationSchema` in each Flink Kafka Producer
sink partition, it might make sense to actually make the key hash
partitioner as the default instead.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



smime.p7s (2K) Download Attachment