Login  Register

Using CustomPartitionerWrapper with KeyedStream

classic Classic list List threaded Threaded
3 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Using CustomPartitionerWrapper with KeyedStream

Philippe CAPARROY
Hi there,

It seems not possible to use some custom partitioner in the context of the KeyedStream, without modifying the KeyedStream.


protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
}

In some particular situations, such as when the keys number is close to the partitions number, and small, using the keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>

might results in collisions in the partition indexes (and hence empty partitions) assigned by the HashPartitioner that is imposed to the KeyedStream : 

public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
this.keySelector = keySelector;
this.keyType = keyType;
}

due to the characteristics of the underlying (any) hash function :

returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;

Is there a particular reason to force the KeyedStream to use a HashPartitioner?

Thanks in advance and best regards.





 


Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Using CustomPartitionerWrapper with KeyedStream

Maximilian Michels
Hi Philippe,

There is no particular reason other than hash partitioning is a
sensible default for most users. It seems like this is rarely an
issue. When the number of keys is close to the parallelism, having
idle partitions is usually not a problem due to low data volume. I see
that it could be a problem if you had multiple "hotspot" keys but then
you will have a hard time to parallelize work load anyways.

Does this limitation really impact performance for you or is this
question of theoretical nature? :) In any case, we could file an issue
and allow other partitioners for keyed streams.

Best,
Max


On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
<[hidden email]> wrote:

> Hi there,
>
> It seems not possible to use some custom partitioner in the context of the
> KeyedStream, without modifying the KeyedStream.
>
>
> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
> {
> throw new UnsupportedOperationException("Cannot override partitioning for
> KeyedStream.");
> }
>
> In some particular situations, such as when the keys number is close to the
> partitions number, and small, using the
> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
>
> might results in collisions in the partition indexes (and hence empty
> partitions) assigned by the HashPartitioner that is imposed to the
> KeyedStream :
>
> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
> keySelector, TypeInformation<KEY> keyType) {
> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
> this.keySelector = keySelector;
> this.keyType = keyType;
> }
>
> due to the characteristics of the underlying (any) hash function :
>
> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
> numberOfOutputChannels;
>
> Is there a particular reason to force the KeyedStream to use a
> HashPartitioner?
>
> Thanks in advance and best regards.
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Using CustomPartitionerWrapper with KeyedStream

Philippe CAPARROY
Hi Max,

Thanks for the answer.
I needed to ensure that in a parallel window operation (which relies on a KeyedStream) each partition contains a single key, in the output stream of the window.
I can obtain this using a customPartitioner just after the window, but relying on the partitioner of the keyedStream could avoid the later transformation.
I was just wondering if there was a particular reason to limit the partitioner of the KeyedStream to a HashPartitioner.
I have no problems of bottleneck or performances anyway.

Best regards.

> Le 12 août 2016 à 12:06, Maximilian Michels <[hidden email]> a écrit :
>
> Hi Philippe,
>
> There is no particular reason other than hash partitioning is a
> sensible default for most users. It seems like this is rarely an
> issue. When the number of keys is close to the parallelism, having
> idle partitions is usually not a problem due to low data volume. I see
> that it could be a problem if you had multiple "hotspot" keys but then
> you will have a hard time to parallelize work load anyways.
>
> Does this limitation really impact performance for you or is this
> question of theoretical nature? :) In any case, we could file an issue
> and allow other partitioners for keyed streams.
>
> Best,
> Max
>
>
> On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
> <[hidden email]> wrote:
>> Hi there,
>>
>> It seems not possible to use some custom partitioner in the context of the
>> KeyedStream, without modifying the KeyedStream.
>>
>>
>> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
>> {
>> throw new UnsupportedOperationException("Cannot override partitioning for
>> KeyedStream.");
>> }
>>
>> In some particular situations, such as when the keys number is close to the
>> partitions number, and small, using the
>> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
>>
>> might results in collisions in the partition indexes (and hence empty
>> partitions) assigned by the HashPartitioner that is imposed to the
>> KeyedStream :
>>
>> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
>> keySelector, TypeInformation<KEY> keyType) {
>> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
>> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
>> this.keySelector = keySelector;
>> this.keyType = keyType;
>> }
>>
>> due to the characteristics of the underlying (any) hash function :
>>
>> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
>> numberOfOutputChannels;
>>
>> Is there a particular reason to force the KeyedStream to use a
>> HashPartitioner?
>>
>> Thanks in advance and best regards.
>>
>>
>>
>>
>>
>>
>>
>>