Login  Register

Using CustomPartitionerWrapper with KeyedStream

Posted by Philippe CAPARROY on Aug 11, 2016; 8:53pm
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Using-CustomPartitionerWrapper-with-KeyedStream-tp8481.html

Hi there,

It seems not possible to use some custom partitioner in the context of the KeyedStream, without modifying the KeyedStream.


protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
}

In some particular situations, such as when the keys number is close to the partitions number, and small, using the keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>

might results in collisions in the partition indexes (and hence empty partitions) assigned by the HashPartitioner that is imposed to the KeyedStream : 

public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
this.keySelector = keySelector;
this.keyType = keyType;
}

due to the characteristics of the underlying (any) hash function :

returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;

Is there a particular reason to force the KeyedStream to use a HashPartitioner?

Thanks in advance and best regards.