Is State access synchronized?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Is State access synchronized?

Federico D'Ambrosio
Hi,

as per the mail subject I wanted to ask you if a State access (read and write) is synchronized.

I have the following stream:

val airtrafficEvents = stream
.keyBy(_.
flightInfo.flight)
.map(new UpdateIdFunction())


where UpdateIdFunction is a RichMapFunction with a ValueState and a MapState, with the following map method

def map(value: AirTrafficEvent): AirTrafficEventWithId = {

  val flight = value.flightInfo.flight
  val time = value.instantValues.time

  AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))

}

private def createOrGetId(_key: String, _time: Long): Int = {

  val tmpId = valuestate.value

  //Remove from MapState entries older than one minute

  val entry = Option[(Int, Long)](lookupMap.get(_key))

  //update ValueState or MapState if needed

  //return current updated ValueState or corresponding ID from updated MapState

}

So, I'm using the MapState to track the integer IDs of the events of the stream, retaining only the latest records inside the MapState, and I'm using the ValueState to generate an incremental integer ID for said events.
Given all of this, I'm really not sure how the mapping is applied to the keyedstream in input: is it guaranteed that each time the method is called I'm getting the latest and updated value/map?

Thank you for your attention,
Federico
Reply | Threaded
Open this post in threaded view
|

Re: Is State access synchronized?

Chesnay Schepler
Hello,

state is local to each parallel instance of an operator. Coupled with the fact that the "map" method is always called by the same thread (and never concurrently) the ValueState (or any state for that matter) will always return the latest values.

On 10.09.2017 14:39, Federico D'Ambrosio wrote:
Hi,

as per the mail subject I wanted to ask you if a State access (read and write) is synchronized.

I have the following stream:

val airtrafficEvents = stream
.keyBy(_.
flightInfo.flight)
.map(new UpdateIdFunction())


where UpdateIdFunction is a RichMapFunction with a ValueState and a MapState, with the following map method

def map(value: AirTrafficEvent): AirTrafficEventWithId = {

  val flight = value.flightInfo.flight
  val time = value.instantValues.time

  AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))

}

private def createOrGetId(_key: String, _time: Long): Int = {

  val tmpId = valuestate.value

  //Remove from MapState entries older than one minute

  val entry = Option[(Int, Long)](lookupMap.get(_key))

  //update ValueState or MapState if needed

  //return current updated ValueState or corresponding ID from updated MapState

}

So, I'm using the MapState to track the integer IDs of the events of the stream, retaining only the latest records inside the MapState, and I'm using the ValueState to generate an incremental integer ID for said events.
Given all of this, I'm really not sure how the mapping is applied to the keyedstream in input: is it guaranteed that each time the method is called I'm getting the latest and updated value/map?

Thank you for your attention,
Federico


Reply | Threaded
Open this post in threaded view
|

Re: Is State access synchronized?

Federico D'Ambrosio-2
Hi,

Thank you very much, Chesnay, for this clarification.

2017-09-11 19:36 GMT+02:00 Chesnay Schepler <[hidden email]>:
Hello,

state is local to each parallel instance of an operator. Coupled with the fact that the "map" method is always called by the same thread (and never concurrently) the ValueState (or any state for that matter) will always return the latest values.


On 10.09.2017 14:39, Federico D'Ambrosio wrote:
Hi,

as per the mail subject I wanted to ask you if a State access (read and write) is synchronized.

I have the following stream:

val airtrafficEvents = stream
.keyBy(_.
flightInfo.flight)
.map(new UpdateIdFunction())


where UpdateIdFunction is a RichMapFunction with a ValueState and a MapState, with the following map method

def map(value: AirTrafficEvent): AirTrafficEventWithId = {

  val flight = value.flightInfo.flight
  val time = value.instantValues.time

  AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))

}

private def createOrGetId(_key: String, _time: Long): Int = {

  val tmpId = valuestate.value

  //Remove from MapState entries older than one minute

  val entry = Option[(Int, Long)](lookupMap.get(_key))

  //update ValueState or MapState if needed

  //return current updated ValueState or corresponding ID from updated MapState

}

So, I'm using the MapState to track the integer IDs of the events of the stream, retaining only the latest records inside the MapState, and I'm using the ValueState to generate an incremental integer ID for said events.
Given all of this, I'm really not sure how the mapping is applied to the keyedstream in input: is it guaranteed that each time the method is called I'm getting the latest and updated value/map?

Thank you for your attention,
Federico





--
Federico D'Ambrosio