Hi there,
It seems not possible to use some custom partitioner in the context of the KeyedStream, without modifying the KeyedStream. protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) { throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream."); } In some particular situations, such as when the keys number is close to the partitions number, and small, using the keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation> might results in collisions in the partition indexes (and hence empty partitions) assigned by the HashPartitioner that is imposed to the KeyedStream : public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) { super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>( dataStream.getTransformation(), new HashPartitioner<>(keySelector))); this.keySelector = keySelector; this.keyType = keyType; } due to the characteristics of the underlying (any) hash function : returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels; Is there a particular reason to force the KeyedStream to use a HashPartitioner? Thanks in advance and best regards. |
Hi Philippe,
There is no particular reason other than hash partitioning is a sensible default for most users. It seems like this is rarely an issue. When the number of keys is close to the parallelism, having idle partitions is usually not a problem due to low data volume. I see that it could be a problem if you had multiple "hotspot" keys but then you will have a hard time to parallelize work load anyways. Does this limitation really impact performance for you or is this question of theoretical nature? :) In any case, we could file an issue and allow other partitioners for keyed streams. Best, Max On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy <[hidden email]> wrote: > Hi there, > > It seems not possible to use some custom partitioner in the context of the > KeyedStream, without modifying the KeyedStream. > > > protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) > { > throw new UnsupportedOperationException("Cannot override partitioning for > KeyedStream."); > } > > In some particular situations, such as when the keys number is close to the > partitions number, and small, using the > keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation> > > might results in collisions in the partition indexes (and hence empty > partitions) assigned by the HashPartitioner that is imposed to the > KeyedStream : > > public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> > keySelector, TypeInformation<KEY> keyType) { > super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>( > dataStream.getTransformation(), new HashPartitioner<>(keySelector))); > this.keySelector = keySelector; > this.keyType = keyType; > } > > due to the characteristics of the underlying (any) hash function : > > returnArray[0] = MathUtils.murmurHash(key.hashCode()) % > numberOfOutputChannels; > > Is there a particular reason to force the KeyedStream to use a > HashPartitioner? > > Thanks in advance and best regards. > > > > > > > > |
Hi Max,
Thanks for the answer. I needed to ensure that in a parallel window operation (which relies on a KeyedStream) each partition contains a single key, in the output stream of the window. I can obtain this using a customPartitioner just after the window, but relying on the partitioner of the keyedStream could avoid the later transformation. I was just wondering if there was a particular reason to limit the partitioner of the KeyedStream to a HashPartitioner. I have no problems of bottleneck or performances anyway. Best regards. > Le 12 août 2016 à 12:06, Maximilian Michels <[hidden email]> a écrit : > > Hi Philippe, > > There is no particular reason other than hash partitioning is a > sensible default for most users. It seems like this is rarely an > issue. When the number of keys is close to the parallelism, having > idle partitions is usually not a problem due to low data volume. I see > that it could be a problem if you had multiple "hotspot" keys but then > you will have a hard time to parallelize work load anyways. > > Does this limitation really impact performance for you or is this > question of theoretical nature? :) In any case, we could file an issue > and allow other partitioners for keyed streams. > > Best, > Max > > > On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy > <[hidden email]> wrote: >> Hi there, >> >> It seems not possible to use some custom partitioner in the context of the >> KeyedStream, without modifying the KeyedStream. >> >> >> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) >> { >> throw new UnsupportedOperationException("Cannot override partitioning for >> KeyedStream."); >> } >> >> In some particular situations, such as when the keys number is close to the >> partitions number, and small, using the >> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation> >> >> might results in collisions in the partition indexes (and hence empty >> partitions) assigned by the HashPartitioner that is imposed to the >> KeyedStream : >> >> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> >> keySelector, TypeInformation<KEY> keyType) { >> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>( >> dataStream.getTransformation(), new HashPartitioner<>(keySelector))); >> this.keySelector = keySelector; >> this.keyType = keyType; >> } >> >> due to the characteristics of the underlying (any) hash function : >> >> returnArray[0] = MathUtils.murmurHash(key.hashCode()) % >> numberOfOutputChannels; >> >> Is there a particular reason to force the KeyedStream to use a >> HashPartitioner? >> >> Thanks in advance and best regards. >> >> >> >> >> >> >> >> |
Free forum by Nabble | Edit this page |