Cannot get value from ValueState in ProcessingTimeTrigger

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot get value from ValueState in ProcessingTimeTrigger

Utopia
Hi,

I want to get the last value stored in ValueState when processing element in Trigger. 

But as the log shows that sometimes I can get the value, sometimes not. 

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long])
  var value = 1
  override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

println("before update value : " + ctx.getPartitionedState(descriptor).value())

ctx.getPartitionedState(descriptor).update(value)

value += 1

println("after update value: " + ctx.getPartitionedState(descriptor).value())


ctx.registerProcessingTimeTimer(window.maxTimestamp)
TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
ctx.deleteProcessingTimeTimer(window.maxTimestamp)
}

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
val windowMaxTimestamp = window.maxTimestamp
if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp)
}

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)

val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val countsPerThirtySecs = sensorData
.keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
.trigger(new ProcessingTimeTrigger)

.process(new CountFunction)

env.execute()
}
}

Log results:

before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia
Reply | Threaded
Open this post in threaded view
|

Re: Cannot get value from ValueState in ProcessingTimeTrigger

vino yang
Hi Utopia,

The behavior may be correct. 

First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other constructors which can not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition your data. For each partition, the default value state is null. 

Best,
Vino


Utopia <[hidden email]> 于2019年12月18日周三 下午7:20写道:
Hi,

I want to get the last value stored in ValueState when processing element in Trigger. 

But as the log shows that sometimes I can get the value, sometimes not. 

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long])
  var value = 1
  override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

println("before update value : " + ctx.getPartitionedState(descriptor).value())

ctx.getPartitionedState(descriptor).update(value)

value += 1

println("after update value: " + ctx.getPartitionedState(descriptor).value())


ctx.registerProcessingTimeTimer(window.maxTimestamp)
TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
ctx.deleteProcessingTimeTimer(window.maxTimestamp)
}

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
val windowMaxTimestamp = window.maxTimestamp
if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp)
}

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)

val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val countsPerThirtySecs = sensorData
.keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
.trigger(new ProcessingTimeTrigger)

.process(new CountFunction)

env.execute()
}
}

Log results:

before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia
Reply | Threaded
Open this post in threaded view
|

Re: Cannot get value from ValueState in ProcessingTimeTrigger

Utopia
Hi Vino,

Thanks for your reply !

The key of my input data is same value. So I think there is only one partition.

And Why sometimes I can get the value stored in the ValueState before update? 
before update value : 3
after update value: 4

What’s more, How can I stored the previous value so that I can get the value when next element come in and invoke the onElement method? 



Best  regards
Utopia
在 2019年12月18日 +0800 21:57,vino yang <[hidden email]>,写道:
Hi Utopia,

The behavior may be correct. 

First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other constructors which can not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition your data. For each partition, the default value state is null. 

Best,
Vino


Utopia <[hidden email]> 于2019年12月18日周三 下午7:20写道:
Hi,

I want to get the last value stored in ValueState when processing element in Trigger. 

But as the log shows that sometimes I can get the value, sometimes not. 

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long])
  var value = 1
  override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

println("before update value : " + ctx.getPartitionedState(descriptor).value())

ctx.getPartitionedState(descriptor).update(value)

value += 1

println("after update value: " + ctx.getPartitionedState(descriptor).value())


ctx.registerProcessingTimeTimer(window.maxTimestamp)
TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
ctx.deleteProcessingTimeTimer(window.maxTimestamp)
}

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
val windowMaxTimestamp = window.maxTimestamp
if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp)
}

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)

val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val countsPerThirtySecs = sensorData
.keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
.trigger(new ProcessingTimeTrigger)

.process(new CountFunction)

env.execute()
}
}

Log results:

before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia
Reply | Threaded
Open this post in threaded view
|

Re: Cannot get value from ValueState in ProcessingTimeTrigger

Utopia
Hi Vino,

Maybe it is due to the type of window. What I used is ProcessingTimeSessionWindows, while keyedState is scoped to window and key. Window changes so that the ValueState is different.

Best  regards
Utopia
在 2019年12月18日 +0800 22:30,Utopia <[hidden email]>,写道:
Hi Vino,

Thanks for your reply !

The key of my input data is same value. So I think there is only one partition.

And Why sometimes I can get the value stored in the ValueState before update? 
before update value : 3
after update value: 4

What’s more, How can I stored the previous value so that I can get the value when next element come in and invoke the onElement method? 



Best  regards
Utopia
在 2019年12月18日 +0800 21:57,vino yang <[hidden email]>,写道:
Hi Utopia,

The behavior may be correct. 

First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other constructors which can not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition your data. For each partition, the default value state is null. 

Best,
Vino


Utopia <[hidden email]> 于2019年12月18日周三 下午7:20写道:
Hi,

I want to get the last value stored in ValueState when processing element in Trigger. 

But as the log shows that sometimes I can get the value, sometimes not. 

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long])
  var value = 1
  override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

println("before update value : " + ctx.getPartitionedState(descriptor).value())

ctx.getPartitionedState(descriptor).update(value)

value += 1

println("after update value: " + ctx.getPartitionedState(descriptor).value())


ctx.registerProcessingTimeTimer(window.maxTimestamp)
TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
ctx.deleteProcessingTimeTimer(window.maxTimestamp)
}

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
val windowMaxTimestamp = window.maxTimestamp
if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp)
}

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)

val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val countsPerThirtySecs = sensorData
.keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
.trigger(new ProcessingTimeTrigger)

.process(new CountFunction)

env.execute()
}
}

Log results:

before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia
Reply | Threaded
Open this post in threaded view
|

Re: Cannot get value from ValueState in ProcessingTimeTrigger

vino yang
Hi Utopia,

IMO, your analysis is correct.

Best,
Vino

Utopia <[hidden email]> 于2019年12月19日周四 上午12:44写道:
Hi Vino,

Maybe it is due to the type of window. What I used is ProcessingTimeSessionWindows, while keyedState is scoped to window and key. Window changes so that the ValueState is different.

Best  regards
Utopia
在 2019年12月18日 +0800 22:30,Utopia <[hidden email]>,写道:
Hi Vino,

Thanks for your reply !

The key of my input data is same value. So I think there is only one partition.

And Why sometimes I can get the value stored in the ValueState before update? 
before update value : 3
after update value: 4

What’s more, How can I stored the previous value so that I can get the value when next element come in and invoke the onElement method? 



Best  regards
Utopia
在 2019年12月18日 +0800 21:57,vino yang <[hidden email]>,写道:
Hi Utopia,

The behavior may be correct. 

First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other constructors which can not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition your data. For each partition, the default value state is null. 

Best,
Vino


Utopia <[hidden email]> 于2019年12月18日周三 下午7:20写道:
Hi,

I want to get the last value stored in ValueState when processing element in Trigger. 

But as the log shows that sometimes I can get the value, sometimes not. 

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long])
  var value = 1
  override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {

println("before update value : " + ctx.getPartitionedState(descriptor).value())

ctx.getPartitionedState(descriptor).update(value)

value += 1

println("after update value: " + ctx.getPartitionedState(descriptor).value())


ctx.registerProcessingTimeTimer(window.maxTimestamp)
TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
ctx.deleteProcessingTimeTimer(window.maxTimestamp)
}

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
val windowMaxTimestamp = window.maxTimestamp
if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp)
}

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)

val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val countsPerThirtySecs = sensorData
.keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
.trigger(new ProcessingTimeTrigger)

.process(new CountFunction)

env.execute()
}
}

Log results:

before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia