ValueState in RichCoFlatMap, possible 1.2-SNAPSHOT regression

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

ValueState in RichCoFlatMap, possible 1.2-SNAPSHOT regression

swiesman

Hi all,

 

I was trying to implement a join similar to what was laid out in the flink forward talk Joining Infinity: Windowless Stream Processing with Flink and I have been running to some issues. I am running on 1.2-SNAPSHOT compiled for scala 2.11 and suspect this may be a regression so I am including the dev mailing list. When initializing the value state I receive a null pointer exception:

 

java.lang.NullPointerException

                at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)

                at com.mediamath.reporting.streaming.FlatMapper$.open(StreamingPipeline.scala:21)

                at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:154)

                at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:368)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:593)

                at java.lang.Thread.run(Thread.java:745)

 

 

Below is a minimum failing example:

 

import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {

  val buffer = ArrayBuffer.empty[Long]


  @transient var state: ValueState[String] = _


  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    state = getRuntimeContext.getState(new ValueStateDescriptor[String]("state-descriptor", classOf[String], ""))
  }

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
    state.update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
    buffer += value

    if (state.value() != "") {
      for (elem ← buffer) {
        out.collect((elem, state.value()))
      }

      buffer.clear()
    }
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

   
env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(new MemoryStateBackend())

    val pipeline1 = env.generateSequence(0, 1000)

    val pipeline2 = env.fromElements("even", "odd")

    pipeline1.connect(pipeline2)
      .keyBy(
        elem
elem % 2 == 0,
        elem
elem == "even"
     
).flatMap(FlatMapper)
      .print()

    env.execute("Example")
  }
}

I also attempted retrieving the state each time I needed it:

 

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {
  val descriptor = new ValueStateDescriptor[String]("state-descriptor", classOf[String], "")

  val buffer = ArrayBuffer.empty[Long]

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
    getRuntimeContext.getState(descriptor).update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
    buffer += value

    val state = getRuntimeContext.getState(descriptor)
   
    if (state.value() != "") {
      for (elem ← buffer) {
        out.collect((elem, state.value()))
      }

      buffer.clear()
    }
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

   
env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(new MemoryStateBackend())

    val pipeline1 = env.generateSequence(0, 1000)

    val pipeline2 = env.fromElements("even", "odd")

    pipeline1.connect(pipeline2)
      .keyBy(
        elem
elem % 2 == 0,
        elem
elem == "even"
     
).flatMap(FlatMapper)
      .print()

    env.execute("Example")
  }

}

 

but this results in this precondition failing on updates.

 

Seth Wiesman

Reply | Threaded
Open this post in threaded view
|

Re: ValueState in RichCoFlatMap, possible 1.2-SNAPSHOT regression

Stefan Richter
Hi,

the problem is this line

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {

which should use „class" instead of „object". Otherwise, one singleton instance of the FlatMapper is used by Flink across multiple operator instances, which leads to the whole bunch of exceptions you experience.

Best,
Stefan

Am 20.10.2016 um 23:22 schrieb Seth Wiesman <[hidden email]>:

Hi all, 
 
I was trying to implement a join similar to what was laid out in the flink forward talk Joining Infinity: Windowless Stream Processing with Flink and I have been running to some issues. I am running on 1.2-SNAPSHOT compiled for scala 2.11 and suspect this may be a regression so I am including the dev mailing list. When initializing the value state I receive a null pointer exception: 
 
java.lang.NullPointerException
                at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
                at com.mediamath.reporting.streaming.FlatMapper$.open(StreamingPipeline.scala:21)
                at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:154)
                at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:368)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:593)
                at java.lang.Thread.run(Thread.java:745)
 
 
Below is a minimum failing example:
 
import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {

  val buffer = ArrayBuffer.empty[Long]

  @transient var state: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    state = getRuntimeContext.getState(new ValueStateDescriptor[String]("state-descriptor", classOf[String], ""))
  }

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
    state.update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
    buffer += value

    if (state.value() != "") {
      for (elem ← buffer) {
        out.collect((elem, state.value()))
      }

      buffer.clear()
    }
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    
env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(new MemoryStateBackend())

    val pipeline1 = env.generateSequence(0, 1000)

    val pipeline2 = env.fromElements("even", "odd")

    pipeline1.connect(pipeline2)
      .keyBy(
        elem 
 elem % 2 == 0,
        elem 
 elem == "even"
      
).flatMap(FlatMapper)
      .print()

    env.execute("Example")
  }
}
I also attempted retrieving the state each time I needed it:
 
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {
  val descriptor = new ValueStateDescriptor[String]("state-descriptor", classOf[String], "")

  val buffer = ArrayBuffer.empty[Long]

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
    getRuntimeContext.getState(descriptor).update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
    buffer += value

    val state = getRuntimeContext.getState(descriptor)
    
    if (state.value() != "") {
      for (elem ← buffer) {
        out.collect((elem, state.value()))
      }

      buffer.clear()
    }
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    
env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(new MemoryStateBackend())

    val pipeline1 = env.generateSequence(0, 1000)

    val pipeline2 = env.fromElements("even", "odd")

    pipeline1.connect(pipeline2)
      .keyBy(
        elem 
 elem % 2 == 0,
        elem 
 elem == "even"
      
).flatMap(FlatMapper)
      .print()

    env.execute("Example")
  }

}
 
but this results in this precondition failing on updates. 
 
Seth Wiesman