Using CustomPartitionerWrapper with KeyedStream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Using CustomPartitionerWrapper with KeyedStream

Philippe CAPARROY
Hi there,

It seems not possible to use some custom partitioner in the context of the KeyedStream, without modifying the KeyedStream.


protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
}

In some particular situations, such as when the keys number is close to the partitions number, and small, using the keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>

might results in collisions in the partition indexes (and hence empty partitions) assigned by the HashPartitioner that is imposed to the KeyedStream : 

public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
this.keySelector = keySelector;
this.keyType = keyType;
}

due to the characteristics of the underlying (any) hash function :

returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;

Is there a particular reason to force the KeyedStream to use a HashPartitioner?

Thanks in advance and best regards.





 


Reply | Threaded
Open this post in threaded view
|

Re: Using CustomPartitionerWrapper with KeyedStream

Maximilian Michels
Hi Philippe,

There is no particular reason other than hash partitioning is a
sensible default for most users. It seems like this is rarely an
issue. When the number of keys is close to the parallelism, having
idle partitions is usually not a problem due to low data volume. I see
that it could be a problem if you had multiple "hotspot" keys but then
you will have a hard time to parallelize work load anyways.

Does this limitation really impact performance for you or is this
question of theoretical nature? :) In any case, we could file an issue
and allow other partitioners for keyed streams.

Best,
Max


On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
<[hidden email]> wrote:

> Hi there,
>
> It seems not possible to use some custom partitioner in the context of the
> KeyedStream, without modifying the KeyedStream.
>
>
> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
> {
> throw new UnsupportedOperationException("Cannot override partitioning for
> KeyedStream.");
> }
>
> In some particular situations, such as when the keys number is close to the
> partitions number, and small, using the
> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
>
> might results in collisions in the partition indexes (and hence empty
> partitions) assigned by the HashPartitioner that is imposed to the
> KeyedStream :
>
> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
> keySelector, TypeInformation<KEY> keyType) {
> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
> this.keySelector = keySelector;
> this.keyType = keyType;
> }
>
> due to the characteristics of the underlying (any) hash function :
>
> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
> numberOfOutputChannels;
>
> Is there a particular reason to force the KeyedStream to use a
> HashPartitioner?
>
> Thanks in advance and best regards.
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Using CustomPartitionerWrapper with KeyedStream

Philippe CAPARROY
Hi Max,

Thanks for the answer.
I needed to ensure that in a parallel window operation (which relies on a KeyedStream) each partition contains a single key, in the output stream of the window.
I can obtain this using a customPartitioner just after the window, but relying on the partitioner of the keyedStream could avoid the later transformation.
I was just wondering if there was a particular reason to limit the partitioner of the KeyedStream to a HashPartitioner.
I have no problems of bottleneck or performances anyway.

Best regards.

> Le 12 août 2016 à 12:06, Maximilian Michels <[hidden email]> a écrit :
>
> Hi Philippe,
>
> There is no particular reason other than hash partitioning is a
> sensible default for most users. It seems like this is rarely an
> issue. When the number of keys is close to the parallelism, having
> idle partitions is usually not a problem due to low data volume. I see
> that it could be a problem if you had multiple "hotspot" keys but then
> you will have a hard time to parallelize work load anyways.
>
> Does this limitation really impact performance for you or is this
> question of theoretical nature? :) In any case, we could file an issue
> and allow other partitioners for keyed streams.
>
> Best,
> Max
>
>
> On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
> <[hidden email]> wrote:
>> Hi there,
>>
>> It seems not possible to use some custom partitioner in the context of the
>> KeyedStream, without modifying the KeyedStream.
>>
>>
>> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
>> {
>> throw new UnsupportedOperationException("Cannot override partitioning for
>> KeyedStream.");
>> }
>>
>> In some particular situations, such as when the keys number is close to the
>> partitions number, and small, using the
>> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
>>
>> might results in collisions in the partition indexes (and hence empty
>> partitions) assigned by the HashPartitioner that is imposed to the
>> KeyedStream :
>>
>> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
>> keySelector, TypeInformation<KEY> keyType) {
>> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
>> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
>> this.keySelector = keySelector;
>> this.keyType = keyType;
>> }
>>
>> due to the characteristics of the underlying (any) hash function :
>>
>> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
>> numberOfOutputChannels;
>>
>> Is there a particular reason to force the KeyedStream to use a
>> HashPartitioner?
>>
>> Thanks in advance and best regards.
>>
>>
>>
>>
>>
>>
>>
>>