ValueState is missing

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

ValueState is missing

Dong-iL, Kim
Hi.
I’m using flink 1.0.3 on aws EMR.
sporadically value of ValueState is lost.
what is starting point for solving this problem.
Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Kostas Kloudas
Hello,

Could you share the code of the job you are running?
With only this information I am afraid we cannot help much.

Thanks,
Kostas

> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>
> Hi.
> I’m using flink 1.0.3 on aws EMR.
> sporadically value of ValueState is lost.
> what is starting point for solving this problem.
> Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Ufuk Celebi
What do you mean with lost exactly?

You call value() and it returns a value (!= null/defaultValue) and you
call it again and it returns null/defaultValue for the same key with
no update in between?

On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
<[hidden email]> wrote:

> Hello,
>
> Could you share the code of the job you are running?
> With only this information I am afraid we cannot help much.
>
> Thanks,
> Kostas
>
>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>
>> Hi.
>> I’m using flink 1.0.3 on aws EMR.
>> sporadically value of ValueState is lost.
>> what is starting point for solving this problem.
>> Thank you.
>
Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Dong-iL, Kim

my code and log is as below.


    val getExecuteEnv: StreamExecutionEnvironment = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
        env.getCheckpointConfig.setCheckpointTimeout(60000)
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
        env
    }

def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
        target.keyBy(_._3).flatMap(new StateOperator)

def main(args: Array[String]) {
        val env = getExecuteEnv
        val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
        val json = deserializeToJsonObj(source).name("ConvertToJson")
        val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
        val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)

}

class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
        var playerState: ValueState[util.Map[String, PotPlayer]] = _
        var handState: ValueState[HandHistoryInfo] = _

        override def open(param: Configuration): Unit = {
            val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
            playerState = getRuntimeContext.getState(playerValueStateDescriptor)
            handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
        }

        override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
            in._2 match {
                case "GameStartHistory" =>
                    val players = playerState.value()
                    val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
                    val record = obj.asInstanceOf[GameStartHistoryRecord]
                    val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
                    if (LOG.isInfoEnabled())
                        LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
                       ….
                    playerState.update(players)
                    handState.update(handHistoryInfo)
                case "HoleCardHistory" =>
                    val players = playerState.value()
                    if (players != null) {
                        ...
                         playerState.update(players)
                    } else LOG.warn("there is no player[hole card]. {}", in._4)
                case "PlayerStateHistory" =>
                    val players = playerState.value()
                    if (players != null) {
                        ….
                        playerState.update(players)
                    } else LOG.warn("there is no player[player state]. {}", in._4)
                case "CommCardHistory" =>
                    val handHistoryInfo = handState.value()
                    val commCardHistory: CommCardHistory = commCardState.value()
                    if (handHistoryInfo != null) {
                        ...
                        handState.update(handHistoryInfo)
                        commCardState.update(commCardHistory)
                    } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
                case "PlayerActionHistory" =>
                    val handHistoryInfo = handState.value()
                    val players = playerState.value()

                    if (handHistoryInfo != null) {
                        ...
                    } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
                case "PotHistory" =>
                    val players = playerState.value()
                    val handHistoryInfo = handState.value()
                    val commCardHistory: CommCardHistory = commCardState.value()
                    if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
                        ...
                    } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
                case "GameEndHistory" =>
                    val players = playerState.value()
                    val handHistoryInfo = handState.value()
                        ...
                    if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
                    playerState.clear()
                    handState.clear()
                case _ =>
            }
        }

—— log ——
2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595

2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].

> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>
> What do you mean with lost exactly?
>
> You call value() and it returns a value (!= null/defaultValue) and you
> call it again and it returns null/defaultValue for the same key with
> no update in between?
>
> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
> <[hidden email]> wrote:
>> Hello,
>>
>> Could you share the code of the job you are running?
>> With only this information I am afraid we cannot help much.
>>
>> Thanks,
>> Kostas
>>
>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>> Hi.
>>> I’m using flink 1.0.3 on aws EMR.
>>> sporadically value of ValueState is lost.
>>> what is starting point for solving this problem.
>>> Thank you.
>>

Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Dong-iL, Kim
in my code, is the config of ExecutionEnv alright?


> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>
>
> my code and log is as below.
>
>
>    val getExecuteEnv: StreamExecutionEnvironment = {
>        val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>        env.getCheckpointConfig.setCheckpointTimeout(60000)
>        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>        env
>    }
>
> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>        target.keyBy(_._3).flatMap(new StateOperator)
>
> def main(args: Array[String]) {
>        val env = getExecuteEnv
>        val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>        val json = deserializeToJsonObj(source).name("ConvertToJson")
>        val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>        val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
> …
> }
>
> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>        var playerState: ValueState[util.Map[String, PotPlayer]] = _
>        var handState: ValueState[HandHistoryInfo] = _
>
>        override def open(param: Configuration): Unit = {
>            val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>            playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>            handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>        }
>
>        override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>            in._2 match {
>                case "GameStartHistory" =>
>                    val players = playerState.value()
>                    val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>                    val record = obj.asInstanceOf[GameStartHistoryRecord]
>                    val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>                    if (LOG.isInfoEnabled())
>                        LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>       ….
>                    playerState.update(players)
>                    handState.update(handHistoryInfo)
>                case "HoleCardHistory" =>
>                    val players = playerState.value()
>                    if (players != null) {
> ...
>                         playerState.update(players)
>                    } else LOG.warn("there is no player[hole card]. {}", in._4)
>                case "PlayerStateHistory" =>
>                    val players = playerState.value()
>                    if (players != null) {
> ….
>                        playerState.update(players)
>                    } else LOG.warn("there is no player[player state]. {}", in._4)
>                case "CommCardHistory" =>
>                    val handHistoryInfo = handState.value()
>                    val commCardHistory: CommCardHistory = commCardState.value()
>                    if (handHistoryInfo != null) {
> ...
>                        handState.update(handHistoryInfo)
>                        commCardState.update(commCardHistory)
>                    } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>                case "PlayerActionHistory" =>
>                    val handHistoryInfo = handState.value()
>                    val players = playerState.value()
>
>                    if (handHistoryInfo != null) {
> ...
>                    } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>                case "PotHistory" =>
>                    val players = playerState.value()
>                    val handHistoryInfo = handState.value()
>                    val commCardHistory: CommCardHistory = commCardState.value()
>                    if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>                        ...
>                    } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>                case "GameEndHistory" =>
>                    val players = playerState.value()
>                    val handHistoryInfo = handState.value()
> ...
>                    if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>                    playerState.clear()
>                    handState.clear()
>                case _ =>
>            }
>        }
>
> —— log ——
> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>
> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>
>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>
>> What do you mean with lost exactly?
>>
>> You call value() and it returns a value (!= null/defaultValue) and you
>> call it again and it returns null/defaultValue for the same key with
>> no update in between?
>>
>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>> <[hidden email]> wrote:
>>> Hello,
>>>
>>> Could you share the code of the job you are running?
>>> With only this information I am afraid we cannot help much.
>>>
>>> Thanks,
>>> Kostas
>>>
>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>
>>>> Hi.
>>>> I’m using flink 1.0.3 on aws EMR.
>>>> sporadically value of ValueState is lost.
>>>> what is starting point for solving this problem.
>>>> Thank you.
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Maximilian Michels
You're clearing the "handState" on "GameEndHistory". I'm assuming this
event comes in before "CommCardHistory" where you check the state.

On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:

> in my code, is the config of ExecutionEnv alright?
>
>
>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>
>>
>> my code and log is as below.
>>
>>
>>    val getExecuteEnv: StreamExecutionEnvironment = {
>>        val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>        env.getCheckpointConfig.setCheckpointTimeout(60000)
>>        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>        env
>>    }
>>
>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>        target.keyBy(_._3).flatMap(new StateOperator)
>>
>> def main(args: Array[String]) {
>>        val env = getExecuteEnv
>>        val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>        val json = deserializeToJsonObj(source).name("ConvertToJson")
>>        val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>        val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>> …
>> }
>>
>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>        var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>        var handState: ValueState[HandHistoryInfo] = _
>>
>>        override def open(param: Configuration): Unit = {
>>            val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>            playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>            handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>        }
>>
>>        override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>            in._2 match {
>>                case "GameStartHistory" =>
>>                    val players = playerState.value()
>>                    val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>                    val record = obj.asInstanceOf[GameStartHistoryRecord]
>>                    val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>                    if (LOG.isInfoEnabled())
>>                        LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>                      ….
>>                    playerState.update(players)
>>                    handState.update(handHistoryInfo)
>>                case "HoleCardHistory" =>
>>                    val players = playerState.value()
>>                    if (players != null) {
>>                       ...
>>                         playerState.update(players)
>>                    } else LOG.warn("there is no player[hole card]. {}", in._4)
>>                case "PlayerStateHistory" =>
>>                    val players = playerState.value()
>>                    if (players != null) {
>>                       ….
>>                        playerState.update(players)
>>                    } else LOG.warn("there is no player[player state]. {}", in._4)
>>                case "CommCardHistory" =>
>>                    val handHistoryInfo = handState.value()
>>                    val commCardHistory: CommCardHistory = commCardState.value()
>>                    if (handHistoryInfo != null) {
>>                       ...
>>                        handState.update(handHistoryInfo)
>>                        commCardState.update(commCardHistory)
>>                    } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>                case "PlayerActionHistory" =>
>>                    val handHistoryInfo = handState.value()
>>                    val players = playerState.value()
>>
>>                    if (handHistoryInfo != null) {
>>                       ...
>>                    } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>                case "PotHistory" =>
>>                    val players = playerState.value()
>>                    val handHistoryInfo = handState.value()
>>                    val commCardHistory: CommCardHistory = commCardState.value()
>>                    if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>                        ...
>>                    } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>                case "GameEndHistory" =>
>>                    val players = playerState.value()
>>                    val handHistoryInfo = handState.value()
>>                       ...
>>                    if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>                    playerState.clear()
>>                    handState.clear()
>>                case _ =>
>>            }
>>        }
>>
>> —— log ——
>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>
>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>
>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>
>>> What do you mean with lost exactly?
>>>
>>> You call value() and it returns a value (!= null/defaultValue) and you
>>> call it again and it returns null/defaultValue for the same key with
>>> no update in between?
>>>
>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>> <[hidden email]> wrote:
>>>> Hello,
>>>>
>>>> Could you share the code of the job you are running?
>>>> With only this information I am afraid we cannot help much.
>>>>
>>>> Thanks,
>>>> Kostas
>>>>
>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>
>>>>> Hi.
>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>> sporadically value of ValueState is lost.
>>>>> what is starting point for solving this problem.
>>>>> Thank you.
>>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Dong-iL, Kim
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Stephan Ewen
Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel source/target pair only.

Have a look here:


Could it be that the events simply arrive in a different order in the functions, so that a later event that looks for state comes before an earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[hidden email]> wrote:
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>


Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Dong-iL, Kim
Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

On Aug 12, 2016, at 7:09 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel source/target pair only.

Have a look here:


Could it be that the events simply arrive in a different order in the functions, so that a later event that looks for state comes before an earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[hidden email]> wrote:
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>



Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Stephan Ewen
Hi!

So far we are not aware of a state loss bug in Flink. My guess is that it is some subtlety in the program.

The check that logs also has other checks, like "handHistoryInfo.playType == PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?


To debug this, you can try and do the following:

Rather than using Flink's key/value state, simply use your own java/scala map in the RichFlatMapFunction.
That is not by default fault-tolerant, but you can use that to see if the error occurs in the same way or not.

Greetings,
Stephan




On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <[hidden email]> wrote:
Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

On Aug 12, 2016, at 7:09 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel source/target pair only.

Have a look here:


Could it be that the events simply arrive in a different order in the functions, so that a later event that looks for state comes before an earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[hidden email]> wrote:
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>




Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Dong-iL, Kim
Hi.
I've tested the program with window function(keyBy->window->collect). it has no problem.

my old program. (keyBy-> state processing). can it be processed by multiple thread within a key?

Thank you.

On Aug 12, 2016, at 8:27 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

So far we are not aware of a state loss bug in Flink. My guess is that it is some subtlety in the program.

The check that logs also has other checks, like "handHistoryInfo.playType == PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?


To debug this, you can try and do the following:

Rather than using Flink's key/value state, simply use your own java/scala map in the RichFlatMapFunction.
That is not by default fault-tolerant, but you can use that to see if the error occurs in the same way or not.

Greetings,
Stephan




On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <[hidden email]> wrote:
Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

On Aug 12, 2016, at 7:09 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel source/target pair only.

Have a look here:


Could it be that the events simply arrive in a different order in the functions, so that a later event that looks for state comes before an earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[hidden email]> wrote:
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>





Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Dong-iL, Kim
Hi. Stephan.

do you mean using map on local excution?
I’ve tested it but not works at all.
Thanks.

On Aug 15, 2016, at 4:56 PM, Dong-iL, Kim <[hidden email]> wrote:

Hi.
I've tested the program with window function(keyBy->window->collect). it has no problem.

my old program. (keyBy-> state processing). can it be processed by multiple thread within a key?

Thank you.

On Aug 12, 2016, at 8:27 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

So far we are not aware of a state loss bug in Flink. My guess is that it is some subtlety in the program.

The check that logs also has other checks, like "handHistoryInfo.playType == PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?


To debug this, you can try and do the following:

Rather than using Flink's key/value state, simply use your own java/scala map in the RichFlatMapFunction.
That is not by default fault-tolerant, but you can use that to see if the error occurs in the same way or not.

Greetings,
Stephan




On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <[hidden email]> wrote:
Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

On Aug 12, 2016, at 7:09 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel source/target pair only.

Have a look here:


Could it be that the events simply arrive in a different order in the functions, so that a later event that looks for state comes before an earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[hidden email]> wrote:
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>






Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Stephan Ewen
In reply to this post by Dong-iL, Kim
Hi!

Concerning your latest questions

  - There should not be multiple threads accessing the same state.
  - With "using a regular Java Map" I mean keeping everything as it is, except instead of using "ValueState" in the RichFlatMapFunction, you use a java.util.HashMap
  - If the program works within windows, it could be that events arrive out of order (are you using Event Time here?)

Greetings,
Stephan



On Mon, Aug 15, 2016 at 9:56 AM, Dong-iL, Kim <[hidden email]> wrote:
Hi.
I've tested the program with window function(keyBy->window->collect). it has no problem.

my old program. (keyBy-> state processing). can it be processed by multiple thread within a key?

Thank you.

On Aug 12, 2016, at 8:27 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

So far we are not aware of a state loss bug in Flink. My guess is that it is some subtlety in the program.

The check that logs also has other checks, like "handHistoryInfo.playType == PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?


To debug this, you can try and do the following:

Rather than using Flink's key/value state, simply use your own java/scala map in the RichFlatMapFunction.
That is not by default fault-tolerant, but you can use that to see if the error occurs in the same way or not.

Greetings,
Stephan




On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <[hidden email]> wrote:
Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

On Aug 12, 2016, at 7:09 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel source/target pair only.

Have a look here:


Could it be that the events simply arrive in a different order in the functions, so that a later event that looks for state comes before an earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[hidden email]> wrote:
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>






Reply | Threaded
Open this post in threaded view
|

Re: ValueState is missing

Dong-iL, Kim
Hi.
I use ingestion time.
I didn’t use timing window. 
I've used a GlobalWindow with custom Trigger as below.
My apply() logic is same as before and no complaint.
Thanks.

    class HandTrigger extends Trigger[(String, String, String, String, Long), GlobalWindow] {
        override def onElement(t: (String, String, String, String, Long), timestamp: Long, w: GlobalWindow,
                               triggerContext: TriggerContext): TriggerResult = {
            if (t._2 == "GameEndHistory") TriggerResult.FIRE_AND_PURGE
            else TriggerResult.CONTINUE
        }

        override def onProcessingTime(timestamp: Long, w: GlobalWindow, triggerContext: TriggerContext): TriggerResult = {
            TriggerResult.CONTINUE
        }

        override def onEventTime(timestamp: Long, w: GlobalWindow, triggerContext: TriggerContext): TriggerResult = {
            TriggerResult.CONTINUE
        }
    }

On Aug 16, 2016, at 12:48 AM, Stephan Ewen <[hidden email]> wrote:

Hi!

Concerning your latest questions

  - There should not be multiple threads accessing the same state.
  - With "using a regular Java Map" I mean keeping everything as it is, except instead of using "ValueState" in the RichFlatMapFunction, you use a java.util.HashMap
  - If the program works within windows, it could be that events arrive out of order (are you using Event Time here?)

Greetings,
Stephan



On Mon, Aug 15, 2016 at 9:56 AM, Dong-iL, Kim <[hidden email]> wrote:
Hi.
I've tested the program with window function(keyBy->window->collect). it has no problem.

my old program. (keyBy-> state processing). can it be processed by multiple thread within a key?

Thank you.

On Aug 12, 2016, at 8:27 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

So far we are not aware of a state loss bug in Flink. My guess is that it is some subtlety in the program.

The check that logs also has other checks, like "handHistoryInfo.playType == PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?


To debug this, you can try and do the following:

Rather than using Flink's key/value state, simply use your own java/scala map in the RichFlatMapFunction.
That is not by default fault-tolerant, but you can use that to see if the error occurs in the same way or not.

Greetings,
Stephan




On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <[hidden email]> wrote:
Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

On Aug 12, 2016, at 7:09 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel source/target pair only.

Have a look here:


Could it be that the events simply arrive in a different order in the functions, so that a later event that looks for state comes before an earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[hidden email]> wrote:
Nope.
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[hidden email]> wrote:
>
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
>
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[hidden email]> wrote:
>> in my code, is the config of ExecutionEnv alright?
>>
>>
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[hidden email]> wrote:
>>>
>>>
>>> my code and log is as below.
>>>
>>>
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>>
>>> def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>>
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = preTransform(json)
>>>       val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>>
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>>
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]())
>>>           playerState = getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>>
>>>       override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>>
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>>
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 5769392597641628595
>>>
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory info[pot].
>>>
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>
>>>> What do you mean with lost exactly?
>>>>
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>>
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <[hidden email]> wrote:
>>>>> Hello,
>>>>>
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[hidden email]> wrote:
>>>>>>
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>>
>>>
>>