Hi Darshan, thanks for raising the problem. We do have similar use of rebalancing in Flink SQL, where we want to rebalance the Kafka input with more partitions to increase parallelism in streaming.
As Fabian suggests, rebalancing is not relation algebra. The closest use of the operation I can find in databases is in vertica (
REBALANCE_TABLE), but it's used more as a one-time rebalance operation of the data after adding/removing nodes. However, on the data processing side, I think this might deserve more attention because we can't easily modify the input data source, e.g. number of Kafka partitions.
The closest I can think of to enable this feature is DDL in SQL, e.g. something like ALTER TABLE REBALANCE in SQL. With this DDL statement, it will cause a rebalance() call when StreamTableSource.getDataStream or BatchTableSource.getDataSet is invoked. In such case, we dont need to touch the parser or planner in Calcite. For the table API, a possible solution would be to add a rebalance() api in table API, but will need to close out the LogicalPlan every time rebalance() is called, so we won't need to touch the Calcite planner.