Re: Kafka KeyedStream source

Posted by Tzu-Li (Gordon) Tai on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Kafka-KeyedStream-source-tp10883p10916.html

Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon


On January 6, 2017 at 1:38:05 AM, Niels Basjes ([hidden email]) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes