I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:
```java
// Original: working for Flink 1.7
//@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>>
streamRecordSerializationDelegate,
int numberOfOutputChannels) {
T value =
streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
// send to all channels
int[] channels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; ++i) {
channels[i] = i;
}
return channels;
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfOutputChannels);
} else {
returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
}
return returnChannels;
}
```
I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:
```java
// New: required by Flink 1.9
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>>
streamRecordSerializationDelegate) {
T value =
streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
/*
It is illegal to call this method for broadcast channel
selectors and this method can remain not
implemented in that case (for example by throwing
UnsupportedOperationException).
*/
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfChannels);
} else {
returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
}
//return returnChannels;
return returnChannels[0];
}
```
Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/