I am wondering if the Kafka connectors leverage Kafka message keys at all?
Looking through the docs my impression is that it does not. E.g. if I use the connector to consume from a partitioned Kafka topic, what I will get back is a DataStream, rather than a KeyedStream. And if I want access to a message's key the key must be within the message to extract it or I have to make use of a KeyedDeserializationSchema with the connector to access the Kafka message key and insert it into the type returned by the connector. Similar, it would seem that you have give the Kafka product sink a KeyedSerializationSchema, which will obtain a Kafka key and a Kafka message from the events from a DataStream, but you can product from a KeyedStream where the key if obtained from the stream itself. Is this correct? |
Hi! You are right with your observations. Right now, you would have to create a "Tuple2<Key, Value>" in the KeyedDeserializationSchema. That is what also a KeyedStream holds internally. A KeyedStream in Flink is more than just a stream that has a Key and a Value - it is also partitioned by the key, and Flink maintains track of keyed state in those streams. That's why it has to be explicitly created. For convenience, one could make an addition that FlinkKafkaConsumer can accept two DeserializationSchema (one for key, one for value) and return a Tuple2<Key, Value> automatically. Greetings, Stephan On Sun, Apr 10, 2016 at 5:49 AM, Elias Levy <[hidden email]> wrote:
|
Hi Stephan, If we were to do that, would flink leverage the fact that Kafka has already partitioned the data by the key, or would flink attempt to shuffle the data again into its own partitions, potentially shuffling data between machines for no gain? Thanks, Andy On Sun, 10 Apr 2016, 13:22 Stephan Ewen, <[hidden email]> wrote:
|
Hi! You can exploit that, yes. If you read data from Kafka in Flink, a Kafka partition is "sticky" to a Flink source subtask. If you have (kafka-source => mapFunction) for example, you can be sure that all values with the same key go through the same parallel mapFunction subtask. If you maintain a HashMap in there, you basically have state by key based on the Kafka partitions. If you want to use Flink's internal key/value state, however, you need to let Flink re-partition the data by using "keyBy()". That is because Flink's internal sharding of state (including the re-sharding to adjust parallelism we are currently working on) follows a dedicated hashing scheme which is with all likelihood different from the partition function that writes the key/value pairs to the Kafka Topics. Hope that helps... Greetings, Stephan On Wed, Apr 13, 2016 at 9:20 AM, Andrew Coates <[hidden email]> wrote:
|
On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen <[hidden email]> wrote:
That is interesting, if somewhat disappointing. I was hoping that performing a keyBy from a Kafka source would perform no reshuffling if you used the same value as you used for the Kafka message key. But it makes sense if you are using different hash functions. It may be useful to have a variant of keyBy() that converts the stream to a KeyedStream but performs no shuffling if the caller is certain that the DataStream is already partitioned by the given key. |
If I can throw in my 2 cents, I agree with what Elias says. Without that feature (not partitioning already partitioned Kafka data), Flink is in bad position for common simpler processing, that don't involve shuffling at all, for example simple readKafka-enrich-writeKafka . The systems like the new Kafka Streams processing system, that leverage Kafka partitioning, will probably win with Flink in performance (of course, it's just an intuition).
Are you planning to provide such feature? Is it simple to do with Flink current engine and API? czw., 14.04.2016 o 03:11 użytkownik Elias Levy <[hidden email]> napisał:
|
Hi!
@Krzysztof: If you use a very simple program like "read kafka" => "enrich (map / flatmap)" => "write kafka", then there will be no shuffle in Flink as well. It will be a very lightweight program, reusing the Kafka Partitioning. @Elias: KeyBy() assumes that the partitioning can be altered by Flink (see key groups and changing the parallelism of programs https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit ) That being said, the DataStream API is a bit hackable: You can create a keyed transformation without doing a keyBy() operation, meaning you can operate on the windows and key/value state, and it would use whatever partitioning already exists (for example the one from Kafka). The way to do this would be to manually inject the state key selector into the operator. For example, to give a MapFunction access to key/value state One can do the following: <code> // original stream DataStream<Tuple2<String, String>> fromKafka = ...; // operation that should get keyed state DataStream<Tuple2<String, Double>> result = fromKafka.flatMap(...); // make operation aware of the keys OneInputTransformation<Tuple2<String, String>, Tuple2<String, Double>> transform = (OneInputTransformation<...>) returnStream.getTransformation(); transform.setStateKeySelector((tuple) -> tuple.f0); transform.setStateKeyType(BasicTypeInfo.STRING_TYPE_INFO); </code> Creating a window on a non-keyed stream is slightly more lines of code, but totally doable as well. If this is something desirable, I could see adding some utils to the API extensions that allow you to do these kind of things in a simply way. One more observation is that a shuffle in Flink is cheaper than a shuffle through Kafka (writing across partitions). It involves no disk, efficient serialization, and has nice back-pressure behavior. Greetings, Stephan On Thu, May 12, 2016 at 10:28 PM, Krzysztof Zarzycki <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |