Using CustomPartitionerWrapper with KeyedStream
Posted by
Philippe CAPARROY on
Aug 11, 2016; 8:53pm
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Using-CustomPartitionerWrapper-with-KeyedStream-tp8481.html
Hi there,
It seems not possible to use some custom partitioner in the context of the KeyedStream, without modifying the KeyedStream.
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
}
In some particular situations, such as when the keys number is close to the partitions number, and small, using the keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
might results in collisions in the partition indexes (and hence empty partitions) assigned by the HashPartitioner that is imposed to the KeyedStream :
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
this.keySelector = keySelector;
this.keyType = keyType;
}
due to the characteristics of the underlying (any) hash function :
returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;
Is there a particular reason to force the KeyedStream to use a HashPartitioner?
Thanks in advance and best regards.