Calculating stateful counts per key

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Calculating stateful counts per key

hkmaki

Hi,

 

I'm trying to calculate stateful counts per key with checkpoints following the example in https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#checkpointing-instance-fields. I would expect my test program to calculate the counts per key, but it seems to group the data by task rather than by key. Is this a Flink bug or have I misunderstood something?

 

The output of inputData.keyBy(0).flatMap(new TestCounters).print is

 

1> (A,count=1)

1> (F,count=2)

2> (B,count=1)

2> (C,count=2)

2> (D,count=3)

2> (E,count=4)

2> (E,count=5)

2> (E,count=6)

2> (H,count=7)

4> (G,count=1)

 

while the output of inputData.keyBy(0).flatMapWithState(...).print is (as I would expect)

 

2> (B,1)

4> (G,1)

1> (A,1)

2> (C,1)

1> (F,1)

2> (D,1)

2> (E,1)

2> (E,2)

2> (E,3)

2> (H,1)

 

I would expect both to give the same results.

 

The full code:

 

import org.apache.flink.api.common.functions.RichFlatMapFunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.configuration.Configuration

import org.apache.flink.streaming.api.checkpoint.Checkpointed

import org.apache.flink.util.Collector

 

object FlinkStreamingTest {

 

  def main(args: Array[String]) {

 

    val env = StreamExecutionEnvironment.createLocalEnvironment()

 

    val checkpointIntervalMillis = 10000

    env.enableCheckpointing(checkpointIntervalMillis)

 

    val inputData = env.fromElements(("A",0),("B",0),("C",0),("D",0),

      ("E",0),("E",0),("E",0),

      ("F",0),("G",0),("H",0))

 

    inputData.keyBy(0).flatMap(new TestCounters).print

 

    /*

    inputData.keyBy(0).flatMapWithState((keyAndCount: (String, Int), count: Option[Int]) =>

      count match {

        case None => (Iterator((keyAndCount._1, 1)), Some(1))

        case Some(c) => (Iterator((keyAndCount._1, c+1)), Some(c+1))

      }).print

    */

 

    env.execute("Counters test")

  }

}

 

case class CounterClass(var count: Int)

 

class TestCounters extends RichFlatMapFunction[(String, Int), (String, String)] with Checkpointed[CounterClass] {

 

  var counterValue: CounterClass = null

 

  override def flatMap(in: (String, Int), out: Collector[(String, String)]) = {

    counterValue.count = counterValue.count + 1

    out.collect((in._1,"count="+counterValue.count))

  }

 

  override def open(config: Configuration): Unit = {

    if(counterValue == null) {

      counterValue = new CounterClass(0)

    }

  }

 

  override def snapshotState(l: Long, l1: Long): CounterClass = {

    counterValue

  }

 

  override def restoreState(state: CounterClass): Unit = {

    counterValue = state

  }

}

Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: Calculating stateful counts per key

Greg Hogan
Hi Mäki,

This is the expected output. Your RichFlatMapFunction is opened once per task and you are sharing counterValue for all keys processed by that task.

Greg


On Mon, Dec 19, 2016 at 11:38 AM, Mäki Hanna <[hidden email]> wrote:

Hi,

 

I'm trying to calculate stateful counts per key with checkpoints following the example in https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#checkpointing-instance-fields. I would expect my test program to calculate the counts per key, but it seems to group the data by task rather than by key. Is this a Flink bug or have I misunderstood something?

 

The output of inputData.keyBy(0).flatMap(new TestCounters).print is

 

1> (A,count=1)

1> (F,count=2)

2> (B,count=1)

2> (C,count=2)

2> (D,count=3)

2> (E,count=4)

2> (E,count=5)

2> (E,count=6)

2> (H,count=7)

4> (G,count=1)

 

while the output of inputData.keyBy(0).flatMapWithState(...).print is (as I would expect)

 

2> (B,1)

4> (G,1)

1> (A,1)

2> (C,1)

1> (F,1)

2> (D,1)

2> (E,1)

2> (E,2)

2> (E,3)

2> (H,1)

 

I would expect both to give the same results.

 

The full code:

 

import org.apache.flink.api.common.functions.RichFlatMapFunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.configuration.Configuration

import org.apache.flink.streaming.api.checkpoint.Checkpointed

import org.apache.flink.util.Collector

 

object FlinkStreamingTest {

 

  def main(args: Array[String]) {

 

    val env = StreamExecutionEnvironment.createLocalEnvironment()

 

    val checkpointIntervalMillis = 10000

    env.enableCheckpointing(checkpointIntervalMillis)

 

    val inputData = env.fromElements(("A",0),("B",0),("C",0),("D",0),

      ("E",0),("E",0),("E",0),

      ("F",0),("G",0),("H",0))

 

    inputData.keyBy(0).flatMap(new TestCounters).print

 

    /*

    inputData.keyBy(0).flatMapWithState((keyAndCount: (String, Int), count: Option[Int]) =>

      count match {

        case None => (Iterator((keyAndCount._1, 1)), Some(1))

        case Some(c) => (Iterator((keyAndCount._1, c+1)), Some(c+1))

      }).print

    */

 

    env.execute("Counters test")

  }

}

 

case class CounterClass(var count: Int)

 

class TestCounters extends RichFlatMapFunction[(String, Int), (String, String)] with Checkpointed[CounterClass] {

 

  var counterValue: CounterClass = null

 

  override def flatMap(in: (String, Int), out: Collector[(String, String)]) = {

    counterValue.count = counterValue.count + 1

    out.collect((in._1,"count="+counterValue.count))

  }

 

  override def open(config: Configuration): Unit = {

    if(counterValue == null) {

      counterValue = new CounterClass(0)

    }

  }

 

  override def snapshotState(l: Long, l1: Long): CounterClass = {

    counterValue

  }

 

  override def restoreState(state: CounterClass): Unit = {

    counterValue = state

  }

}

Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: Calculating stateful counts per key

Fabian Hueske-2
Hi Mäki,

some additions to Greg's answer: The flatMapWithState shortcut of the Scala API uses Flink's key-value state while your TestCounters class uses the Checkpointed interface.
As Greg said, the checkpointed interface operates on an operator level not per key. The key-value state automatically switches the context to the current key.
The documentation contains details [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html

2016-12-20 16:34 GMT+01:00 Greg Hogan <[hidden email]>:
Hi Mäki,

This is the expected output. Your RichFlatMapFunction is opened once per task and you are sharing counterValue for all keys processed by that task.

Greg



On Mon, Dec 19, 2016 at 11:38 AM, Mäki Hanna <[hidden email]> wrote:

Hi,

 

I'm trying to calculate stateful counts per key with checkpoints following the example in https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#checkpointing-instance-fields. I would expect my test program to calculate the counts per key, but it seems to group the data by task rather than by key. Is this a Flink bug or have I misunderstood something?

 

The output of inputData.keyBy(0).flatMap(new TestCounters).print is

 

1> (A,count=1)

1> (F,count=2)

2> (B,count=1)

2> (C,count=2)

2> (D,count=3)

2> (E,count=4)

2> (E,count=5)

2> (E,count=6)

2> (H,count=7)

4> (G,count=1)

 

while the output of inputData.keyBy(0).flatMapWithState(...).print is (as I would expect)

 

2> (B,1)

4> (G,1)

1> (A,1)

2> (C,1)

1> (F,1)

2> (D,1)

2> (E,1)

2> (E,2)

2> (E,3)

2> (H,1)

 

I would expect both to give the same results.

 

The full code:

 

import org.apache.flink.api.common.functions.RichFlatMapFunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.configuration.Configuration

import org.apache.flink.streaming.api.checkpoint.Checkpointed

import org.apache.flink.util.Collector

 

object FlinkStreamingTest {

 

  def main(args: Array[String]) {

 

    val env = StreamExecutionEnvironment.createLocalEnvironment()

 

    val checkpointIntervalMillis = 10000

    env.enableCheckpointing(checkpointIntervalMillis)

 

    val inputData = env.fromElements(("A",0),("B",0),("C",0),("D",0),

      ("E",0),("E",0),("E",0),

      ("F",0),("G",0),("H",0))

 

    inputData.keyBy(0).flatMap(new TestCounters).print

 

    /*

    inputData.keyBy(0).flatMapWithState((keyAndCount: (String, Int), count: Option[Int]) =>

      count match {

        case None => (Iterator((keyAndCount._1, 1)), Some(1))

        case Some(c) => (Iterator((keyAndCount._1, c+1)), Some(c+1))

      }).print

    */

 

    env.execute("Counters test")

  }

}

 

case class CounterClass(var count: Int)

 

class TestCounters extends RichFlatMapFunction[(String, Int), (String, String)] with Checkpointed[CounterClass] {

 

  var counterValue: CounterClass = null

 

  override def flatMap(in: (String, Int), out: Collector[(String, String)]) = {

    counterValue.count = counterValue.count + 1

    out.collect((in._1,"count="+counterValue.count))

  }

 

  override def open(config: Configuration): Unit = {

    if(counterValue == null) {

      counterValue = new CounterClass(0)

    }

  }

 

  override def snapshotState(l: Long, l1: Long): CounterClass = {

    counterValue

  }

 

  override def restoreState(state: CounterClass): Unit = {

    counterValue = state

  }

}

Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.