Hi all, I want keyBy() to evenly distribute records over operator subtasks especially for a small number of keys. I execute a test code (see below if interested) with varying numbers of keys while setting parallelism to 5. The key assignment to subtasks is as follows: - 5 keys over 5 subtasks : each subtask with 3, 1, 1, 0, 0 keys, respectively - 10 keys over 5 subtasks : each subtask with 4, 3, 1, 1, 1 keys, respectively - 20 keys over 5 subtasks : each subtask with 6, 5, 4, 3, 2, respectively - 30 keys over 5 subtasks : each subtask with 8, 7, 6, 6, 3, respectively - 40 keys over 5 subtasks : each subtask with 11, 10, 8, 3, 3, respectively - 50 keys over 5 subtasks : each subtask with 13, 11, 10, 9 ,7, respectively I repeated the test for each setting, and found that the key assignment is deterministic when # keys and # subtasks are fixed. I manage to do that with customPartition(). But what I really want is to get a keyed stream and to apply a window function to a sliding window (not shown in the below code though) I found that Stephan once suggested to generate a special key to be used by keyBy() as shown in http://apache-flink- but I cannot find any example about it. How can I generate a special key in order to evenly distribute keys to operator subtasks? Otherwise, is there another way of evenly distributing keys? - Dongwon Kim ---- test code ----- import org.apache.flink.api.common.functions.Partitioner import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import scala.collection.mutable case class Data(time: Int) object FlinkTest extends App { val parallelism = 5 val numberOfKeys = 5 // 5, 10, 20, 30, 40, 50 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) env .addSource( new RichSourceFunction[(Int, Data)] { var running = false override def open(parameters: Configuration): Unit = { running = true } override def run(ctx: SourceContext[(Int, Data)]): Unit = { val iter = (1 to 10000).iterator while (running && iter.hasNext) { val t = iter.next (1 to numberOfKeys) foreach { key => ctx.collect((key, new Data(t))) } } running = false } override def cancel(): Unit = { running = false } } ) .keyBy(0) // .partitionCustom( // new Partitioner[Int](){ // override def partition(key: Int, numPartitions: Int): Int = key % numPartitions // }, // _._1 // ) .addSink( new RichSinkFunction[(Int, Data)] { var counts: mutable.HashMap[Int, Int] = _ override def open(parameters: Configuration): Unit = { counts = new mutable.HashMap() } override def invoke(record: (Int, Data)) = { val key = record._1 val cnt = counts.getOrElseUpdate(key, 0) counts.update(key, cnt+1) } override def close(): Unit = { println(s"close : ${counts.size} $counts") } } ) env.execute() } |
Free forum by Nabble | Edit this page |