Hi: If I register both event time timer and processing time timer with the same timestamp for a particular key - will they both fire or only one will fire ? If only one, what will be its time domain ? Thanks |
Also, are the event time timers and processing time timers handled separately - ie, if I register event time timer and then use the same timestamp to delete processing time timer - will it remove the event time timer registration ? In the example https://github.com/streaming-with-flink/examples-scala/blob/master/src/main/scala/io/github/streamingwithflink/chapter6/CoProcessFunctionTimers.scala#L87 should the delete and register be on the processing time timer and is it ok to delete event time timer and register processing time timer ? override def processElement2( switch: (String, Long), ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context, out: Collector[SensorReading]): Unit = { // enable reading forwarding forwardingEnabled.update(true) // set disable forward timer val timerTimestamp = ctx.timerService().currentProcessingTime() + switch._2 val curTimerTimestamp = disableTimer.value() if (timerTimestamp > curTimerTimestamp) { // remove current timer and register new timer ctx.timerService().deleteEventTimeTimer(curTimerTimestamp) ctx.timerService().registerProcessingTimeTimer(timerTimestamp) disableTimer.update(timerTimestamp) } }
On Sunday, July 14, 2019, 01:52:44 PM EDT, M Singh <[hidden email]> wrote:
Hi: If I register both event time timer and processing time timer with the same timestamp for a particular key - will they both fire or only one will fire ? If only one, what will be its time domain ? Thanks |
Hey Folks - Just checking if you have any pointers for me. Thanks for your advice.
On Sunday, July 14, 2019, 03:12:25 PM EDT, M Singh <[hidden email]> wrote:
Also, are the event time timers and processing time timers handled separately - ie, if I register event time timer and then use the same timestamp to delete processing time timer - will it remove the event time timer registration ? In the example https://github.com/streaming-with-flink/examples-scala/blob/master/src/main/scala/io/github/streamingwithflink/chapter6/CoProcessFunctionTimers.scala#L87 should the delete and register be on the processing time timer and is it ok to delete event time timer and register processing time timer ? override def processElement2( switch: (String, Long), ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context, out: Collector[SensorReading]): Unit = { // enable reading forwarding forwardingEnabled.update(true) // set disable forward timer val timerTimestamp = ctx.timerService().currentProcessingTime() + switch._2 val curTimerTimestamp = disableTimer.value() if (timerTimestamp > curTimerTimestamp) { // remove current timer and register new timer ctx.timerService().deleteEventTimeTimer(curTimerTimestamp) ctx.timerService().registerProcessingTimeTimer(timerTimestamp) disableTimer.update(timerTimestamp) } }
On Sunday, July 14, 2019, 01:52:44 PM EDT, M Singh <[hidden email]> wrote:
Hi: If I register both event time timer and processing time timer with the same timestamp for a particular key - will they both fire or only one will fire ? If only one, what will be its time domain ? Thanks |
Hi, Is it possible to support two different `TimeCharacteristic` in one job at the same time? I guess the answer is no. So I don't think there exists such a scenario. M Singh <[hidden email]> 于2019年7月19日周五 上午12:19写道:
|
Hi, Event and processing time timers have independent state storage. You can use both independently, so I would expect two firings with different domains. `TimeCharacteristic` is for operations where you do not explicitly tell the time type, like windowing. Best, Andrey On Fri, Jul 19, 2019 at 8:18 AM Biao Liu <[hidden email]> wrote:
|
Hi Bioa/Andrey: Just to clarify, can we register two timers (one for processing time and one for event time) with the same timestamp and if so, which one will fire. Also, is it ok to register an event time time and then deregister processing time time (or vice versa) ? Here is the example I am referring to: In the example https://github.com/streaming-with-flink/examples-scala/blob/master/src/main/scala/io/github/streamingwithflink/chapter6/CoProcessFunctionTimers.scala#L87 should the delete and register be on the processing time timer and is it ok to delete event time timer and register processing time timer ? override def processElement2( switch: (String, Long), ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context, out: Collector[SensorReading]): Unit = { // enable reading forwarding forwardingEnabled.update(true) // set disable forward timer val timerTimestamp = ctx.timerService().currentProcessingTime() + switch._2 val curTimerTimestamp = disableTimer.value() if (timerTimestamp > curTimerTimestamp) { // remove current timer and register new timer ctx.timerService().deleteEventTimeTimer(curTimerTimestamp) ctx.timerService().registerProcessingTimeTimer(timerTimestamp) disableTimer.update(timerTimestamp) } } Thanks again.
On Friday, July 19, 2019, 12:30:57 PM EDT, Andrey Zagrebin <[hidden email]> wrote:
Hi, Event and processing time timers have independent state storage. You can use both independently, so I would expect two firings with different domains. `TimeCharacteristic` is for operations where you do not explicitly tell the time type, like windowing. Best, Andrey On Fri, Jul 19, 2019 at 8:18 AM Biao Liu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |