Re-keying / sub-keying a stream without repartitioning

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Re-keying / sub-keying a stream without repartitioning

Elias Levy
This is something that has come up before on the list, but in a different context.  I have a need to rekey a stream but would prefer the stream to not be repartitioned.  There is no gain to repartitioning, as the new partition key is a composite of the stream key, going from a key of A to a key of (A, B), so all values for the resulting streams are already being rerouted to the same node and repartitioning them to other nodes would simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate operations.  Instead I am executing CEP patterns.  Some patterns apply the the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation. keyBy() will repartition and there is isn't something like subKey() to subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the second element in the key, and delegating to the default partitioner passing it only the first element, thus resulting in no change of task assignment.

Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Re-keying / sub-keying a stream without repartitioning

Elias Levy
Anyone?

On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy <[hidden email]> wrote:
This is something that has come up before on the list, but in a different context.  I have a need to rekey a stream but would prefer the stream to not be repartitioned.  There is no gain to repartitioning, as the new partition key is a composite of the stream key, going from a key of A to a key of (A, B), so all values for the resulting streams are already being rerouted to the same node and repartitioning them to other nodes would simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate operations.  Instead I am executing CEP patterns.  Some patterns apply the the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation. keyBy() will repartition and there is isn't something like subKey() to subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the second element in the key, and delegating to the default partitioner passing it only the first element, thus resulting in no change of task assignment.

Thoughts?

Reply | Threaded
Open this post in threaded view
|

Re: Re-keying / sub-keying a stream without repartitioning

Aljoscha Krettek
Hi Elias,
sorry for the delay, this must have fallen under the table after Flink Forward.

I did spend some time thinking about this and we had the idea for a while now to add an operation like “keyByWithoutPartitioning()” (name not final ;-) that would allow the user to tell the system that we don’t have to do a reshuffle. This would work if the key-type (and keys) would stay exactly the same.

I think it wouldn’t work for your case because the key type changes and elements for key (A, B) would normally be reshuffled to different instances than with key (A), i.e. (1, 1) does not belong to the same key-group as (1). Would you agree that this happens in your case?

Best,
Aljoscha 

On 25. Apr 2017, at 23:32, Elias Levy <[hidden email]> wrote:

Anyone?

On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy <[hidden email]> wrote:
This is something that has come up before on the list, but in a different context.  I have a need to rekey a stream but would prefer the stream to not be repartitioned.  There is no gain to repartitioning, as the new partition key is a composite of the stream key, going from a key of A to a key of (A, B), so all values for the resulting streams are already being rerouted to the same node and repartitioning them to other nodes would simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate operations.  Instead I am executing CEP patterns.  Some patterns apply the the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation. keyBy() will repartition and there is isn't something like subKey() to subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the second element in the key, and delegating to the default partitioner passing it only the first element, thus resulting in no change of task assignment.

Thoughts?


Reply | Threaded
Open this post in threaded view
|

Re: Re-keying / sub-keying a stream without repartitioning

Elias Levy

On Wed, Apr 26, 2017 at 5:11 AM, Aljoscha Krettek <[hidden email]> wrote:
I did spend some time thinking about this and we had the idea for a while now to add an operation like “keyByWithoutPartitioning()” (name not final ;-) that would allow the user to tell the system that we don’t have to do a reshuffle. This would work if the key-type (and keys) would stay exactly the same.

I think it wouldn’t work for your case because the key type changes and elements for key (A, B) would normally be reshuffled to different instances than with key (A), i.e. (1, 1) does not belong to the same key-group as (1). Would you agree that this happens in your case?

It happens if I use keyBy().  But there is no need for it to happen, which is why I was asking about rekeying without repartitioning.  The stream is already partitioned by A, so all elements of a new stream keyed by (A,B) are already being processed by the local task.  Reshuffling as a result of rekeying would have no benefit and would double the network traffic.  It is why I suggested subKey(B) may be a good to clearly indicate that the new key just sub-partitions the existing key partition without requiring reshuffling.

Why would you not be able to use a different key type with keyByWithoutRepartitioning()?