keyBy doesn't evenly distribute keys

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

keyBy doesn't evenly distribute keys

Dongwon Kim-2
Hi all,

I want keyBy() to evenly distribute records over operator subtasks especially for a small number of keys.

I execute a test code (see below if interested) with varying numbers of keys while setting parallelism to 5.
The key assignment to subtasks is as follows:
- 5 keys over 5 subtasks   : each subtask with 3, 1, 1, 0, 0 keys, respectively 
- 10 keys over 5 subtasks : each subtask with 4, 3, 1, 1, 1 keys, respectively
- 20 keys over 5 subtasks : each subtask with 6, 5, 4, 3, 2, respectively  
- 30 keys over 5 subtasks : each subtask with 8, 7, 6, 6, 3, respectively
- 40 keys over 5 subtasks : each subtask with 11, 10, 8, 3, 3, respectively
- 50 keys over 5 subtasks : each subtask with 13, 11, 10, 9 ,7, respectively
I repeated the test for each setting, and found that the key assignment is deterministic when # keys and # subtasks are fixed. 

I manage to do that with customPartition().
But what I really want is to get a keyed stream and to apply a window function to a sliding window (not shown in the below code though)

I found that Stephan once suggested to generate a special key to be used by keyBy() as shown in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-using-custom-partitioner-td5379.html,
but I cannot find any example about it.
How can I generate a special key in order to evenly distribute keys to operator subtasks?

Otherwise, is there another way of evenly distributing keys?

- Dongwon Kim

---- test code -----
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.collection.mutable

case class Data(time: Int)

object FlinkTest extends App {
  val parallelism = 5
  val numberOfKeys = 5 // 5, 10, 20, 30, 40, 50
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(parallelism)

  env
    .addSource(
      new RichSourceFunction[(Int, Data)] {
        var running = false
        override def open(parameters: Configuration): Unit = { running = true }
        override def run(ctx: SourceContext[(Int, Data)]): Unit = {
          val iter = (1 to 10000).iterator
          while (running && iter.hasNext) {
            val t = iter.next
            (1 to numberOfKeys) foreach { key =>
              ctx.collect((key, new Data(t)))
            }
          }
          running = false
        }
        override def cancel(): Unit = { running = false }
      }
    )
    .keyBy(0)
    //    .partitionCustom(
    //      new Partitioner[Int](){
    //        override def partition(key: Int, numPartitions: Int): Int = key % numPartitions
    //      },
    //      _._1
    //    )
    .addSink(
    new RichSinkFunction[(Int, Data)] {

      var counts: mutable.HashMap[Int, Int] = _

      override def open(parameters: Configuration): Unit = {
        counts = new mutable.HashMap()
      }

      override def invoke(record: (Int, Data)) = {
        val key = record._1
        val cnt = counts.getOrElseUpdate(key, 0)
        counts.update(key, cnt+1)
      }

      override def close(): Unit = {
        println(s"close : ${counts.size} $counts")
      }
    }
  )
  env.execute()
}