Migrate custom partitioner from Flink 1.7 to Flink 1.9

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Migrate custom partitioner from Flink 1.7 to Flink 1.9

Salva Alcántara
I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:

```java
    // Original: working for Flink 1.7
    //@Override
    public int[] selectChannels(SerializationDelegate<StreamRecord&lt;T>>
streamRecordSerializationDelegate,
                                int numberOfOutputChannels) {
        T value =
streamRecordSerializationDelegate.getInstance().getValue();
        if (value.f0.isBroadCastPartitioning()) {
            // send to all channels
            int[] channels = new int[numberOfOutputChannels];
            for (int i = 0; i < numberOfOutputChannels; ++i) {
                channels[i] = i;
            }
            return channels;
        } else if (value.f0.getPartitionKey() == -1) {
            // random partition
            returnChannels[0] = random.nextInt(numberOfOutputChannels);
        } else {
            returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
        }
        return returnChannels;
    }

```

I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:


```java
    // New: required by Flink 1.9
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord&lt;T>>
streamRecordSerializationDelegate) {
        T value =
streamRecordSerializationDelegate.getInstance().getValue();
        if (value.f0.isBroadCastPartitioning()) {
            /*
            It is illegal to call this method for broadcast channel
selectors and this method can remain not
            implemented in that case (for example by throwing
UnsupportedOperationException).
            */
        } else if (value.f0.getPartitionKey() == -1) {
            // random partition
            returnChannels[0] = random.nextInt(numberOfChannels);
        } else {
            returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
        }
        //return returnChannels;
        return returnChannels[0];
    }
```

Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

Chesnay Schepler
You should be able to implement this on the DataStream API level using
DataStream#broadcast and #union like this:

input = ...

singleChannel = input.filter(x -> !x.isBroadCastPartitioning);

broadcastChannel = input.filter(x -> x.isBroadCastPartitioning);

result = broadcastChannel.broadcast().union(singleChannel)

// apply operations on result


On 26/12/2019 08:20, Salva Alcántara wrote:

> I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
> 1.9. The original partitioner implemented the `selectChannels` method within
> the `StreamPartitioner` interface like this:
>
> ```java
>      // Original: working for Flink 1.7
>      //@Override
>      public int[] selectChannels(SerializationDelegate<StreamRecord&lt;T>>
> streamRecordSerializationDelegate,
>                                  int numberOfOutputChannels) {
>          T value =
> streamRecordSerializationDelegate.getInstance().getValue();
>          if (value.f0.isBroadCastPartitioning()) {
>              // send to all channels
>              int[] channels = new int[numberOfOutputChannels];
>              for (int i = 0; i < numberOfOutputChannels; ++i) {
>                  channels[i] = i;
>              }
>              return channels;
>          } else if (value.f0.getPartitionKey() == -1) {
>              // random partition
>              returnChannels[0] = random.nextInt(numberOfOutputChannels);
>          } else {
>              returnChannels[0] =
> partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
>          }
>          return returnChannels;
>      }
>
> ```
>
> I am not sure how to migrate this to Flink 1.9, since the
> `StreamPartitioner` interface has changed as illustrated below:
>
>
> ```java
>      // New: required by Flink 1.9
>      @Override
>      public int selectChannel(SerializationDelegate<StreamRecord&lt;T>>
> streamRecordSerializationDelegate) {
>          T value =
> streamRecordSerializationDelegate.getInstance().getValue();
>          if (value.f0.isBroadCastPartitioning()) {
>              /*
>              It is illegal to call this method for broadcast channel
> selectors and this method can remain not
>              implemented in that case (for example by throwing
> UnsupportedOperationException).
>              */
>          } else if (value.f0.getPartitionKey() == -1) {
>              // random partition
>              returnChannels[0] = random.nextInt(numberOfChannels);
>          } else {
>              returnChannels[0] =
> partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
>          }
>          //return returnChannels;
>          return returnChannels[0];
>      }
> ```
>
> Note that `selectChannels` has been replaced with `selectChannel`. So, it is
> no longer possible to return multiple output channels as originally done
> above for the case of broadcasted elements. As a matter of fact,
> `selectChannel` should not be invoked for this particular case. Any thoughts
> on how to tackle this?
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

Salva Alcántara
Thanks Chesnay! Just to be clear, this how my current code looks like:

```
unionChannel = broadcastChannel.broadcast().union(singleChannel)

result = new DataStream<>(
    unionChannel.getExecutionEnvironment(),
    new PartitionTransformation<>(unionChannel.getTransformation(), new
MyDynamicPartitioner())  
)
```

The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot
handle broadcasted elements as explained in the question description. So,
based on your reply, I guess I could do something like this:

```
resultSingleChannel = new DataStream<>(
    singleChannel.getExecutionEnvironment(),
    new PartitionTransformation<>(singleChannel.getTransformation(), new
MyDynamicPartitioner())  
)

result = broadcastChannel.broadcast().union(resultSingleChannel)
```

I will give it a try and see if it works.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

Arvid Heise-3
Hi Salva,

I already answered on SO [1], but I'll replicate it here:

With Flink 1.9, you cannot dynamically broadcast to all channels anymore. Your StreamPartitioner has to statically specify if it's a broadcast with isBroadcast. Then, selectChannel is never invoked.

Do you have a specific use case, where you'd need to dynamically switch?

Best,

Arvid

[1] https://stackoverflow.com/questions/59485064/migrating-custom-dynamic-partitioner-from-flink-1-7-to-flink-1-9


On Sat, Jan 4, 2020 at 7:00 AM Salva Alcántara <[hidden email]> wrote:
Thanks Chesnay! Just to be clear, this how my current code looks like:

```
unionChannel = broadcastChannel.broadcast().union(singleChannel)

result = new DataStream<>(
    unionChannel.getExecutionEnvironment(),
    new PartitionTransformation<>(unionChannel.getTransformation(), new
MyDynamicPartitioner())   
)
```

The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot
handle broadcasted elements as explained in the question description. So,
based on your reply, I guess I could do something like this:

```
resultSingleChannel = new DataStream<>(
    singleChannel.getExecutionEnvironment(),
    new PartitionTransformation<>(singleChannel.getTransformation(), new
MyDynamicPartitioner())   
)

result = broadcastChannel.broadcast().union(resultSingleChannel)
```

I will give it a try and see if it works.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/