Hi, as per the mail subject I wanted to ask you if a State access (read and write) is synchronized.val airtrafficEvents = stream .keyBy(_.flightInfo.flight) .map(new UpdateIdFunction()) where UpdateIdFunction is a RichMapFunction with a ValueState and a MapState, with the following map method def map(value: AirTrafficEvent): AirTrafficEventWithId = { val flight = value.flightInfo.flight val time = value.instantValues.time AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis)) } private def createOrGetId(_key: String, _time: Long): Int = { val tmpId = valuestate.value val entry = Option[(Int, Long)](lookupMap.get(_key)) //update ValueState or MapState if needed //return current updated ValueState or corresponding ID from updated MapState } So, I'm using the MapState to track the integer IDs of the events of the stream, retaining only the latest records inside the MapState, and I'm using the ValueState to generate an incremental integer ID for said events. Given all of this, I'm really not sure how the mapping is applied to the keyedstream in input: is it guaranteed that each time the method is called I'm getting the latest and updated value/map? Thank you for your attention, Federico |
Hello,
state is local to each parallel instance of an operator. Coupled with the fact that the "map" method is always called by the same thread (and never concurrently) the ValueState (or any state for that matter) will always return the latest values. On 10.09.2017 14:39, Federico D'Ambrosio wrote:
|
Hi, Thank you very much, Chesnay, for this clarification. 2017-09-11 19:36 GMT+02:00 Chesnay Schepler <[hidden email]>:
-- Federico D'Ambrosio
|
Free forum by Nabble | Edit this page |