Hi, I'm trying to calculate stateful counts per key with checkpoints following the example in
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#checkpointing-instance-fields. I would expect my test program to calculate the counts per key, but it seems to group the data by task rather than by key. Is this a Flink
bug or have I misunderstood something? The output of inputData.keyBy(0).flatMap(new TestCounters).print is 1> (A,count=1) 1> (F,count=2) 2> (B,count=1) 2> (C,count=2) 2> (D,count=3) 2> (E,count=4) 2> (E,count=5) 2> (E,count=6) 2> (H,count=7) 4> (G,count=1) while the output of inputData.keyBy(0).flatMapWithState(...).print is (as I would expect) 2> (B,1) 4> (G,1) 1> (A,1) 2> (C,1) 1> (F,1) 2> (D,1) 2> (E,1) 2> (E,2) 2> (E,3) 2> (H,1) I would expect both to give the same results.
The full code: import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.checkpoint.Checkpointed import org.apache.flink.util.Collector object FlinkStreamingTest { def main(args: Array[String]) { val env = StreamExecutionEnvironment.createLocalEnvironment() val checkpointIntervalMillis = 10000 env.enableCheckpointing(checkpointIntervalMillis) val inputData = env.fromElements(("A",0),("B",0),("C",0),("D",0), ("E",0),("E",0),("E",0), ("F",0),("G",0),("H",0)) inputData.keyBy(0).flatMap(new TestCounters).print /* inputData.keyBy(0).flatMapWithState((keyAndCount: (String, Int), count: Option[Int]) => count match { case None => (Iterator((keyAndCount._1, 1)), Some(1)) case Some(c) => (Iterator((keyAndCount._1, c+1)), Some(c+1)) }).print */ env.execute("Counters test") } } case class CounterClass(var count: Int) class TestCounters extends RichFlatMapFunction[(String, Int), (String, String)] with Checkpointed[CounterClass] { var counterValue: CounterClass = null override def flatMap(in: (String, Int), out: Collector[(String, String)]) = { counterValue.count = counterValue.count + 1 out.collect((in._1,"count="+counterValue.count)) } override def open(config: Configuration): Unit = { if(counterValue == null) { counterValue = new CounterClass(0) } } override def snapshotState(l: Long, l1: Long): CounterClass = { counterValue } override def restoreState(state: CounterClass): Unit = { counterValue = state } } |
Hi Mäki, This is the expected output. Your RichFlatMapFunction is opened once per task and you are sharing counterValue for all keys processed by that task. On Mon, Dec 19, 2016 at 11:38 AM, Mäki Hanna <[hidden email]> wrote:
|
Hi Mäki, some additions to Greg's answer: The flatMapWithState shortcut of the Scala API uses Flink's key-value state while your TestCounters class uses the Checkpointed interface.[1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html 2016-12-20 16:34 GMT+01:00 Greg Hogan <[hidden email]>:
|
Free forum by Nabble | Edit this page |