Before a program close, it will emit Long.MaxValue as the watermark and that watermark will trigger all the windows. This is the reason why your `timeWindow` program could work. However, for the first program, you have not registered the event time timer(though context.timerService.registerEventTimeTimer) and also there is also no onTimer logic defined to process it.在 2019年10月28日,下午4:01,杨力 <[hidden email]> 写道:It seems to be the case. But when I use timeWindow or CEP with fromCollection, it works well. For example,```sEnv.fromCollection(Seq[Long](1, 1002, 2002, 3002)).assignAscendingTimestamps(identity[Long]).keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()```prints```1100220023002```How can I implement my KeyedProcessFunction so that it would work as expected.Dian Fu <[hidden email]> 于 2019年10月28日周一 下午2:04写道:Hi,It generates watermark periodically by default in the underlying implementation of `assignAscendingTimestamps`. So for your test program, the watermark is still not generated yet and I think that's the reason why it's Long.MinValue.Regards,Dian在 2019年10月28日,上午11:59,杨力 <[hidden email]> 写道:I'm going to sort elements in a PriorityQueue and set up timers at (currentWatermark + 1), following the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing.However, it seems that context.timerService().currentWatermark() always returns Long.MinValue and my onTimer will never be called. Here's minimal program to reproduce the problem. Am I missing something?```val sEnv = StreamExecutionEnvironment.getExecutionEnvironmentsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)sEnv.setParallelism(argOps.parallelism())sEnv.fromCollection(Seq[Long](1, 2, 3)).assignAscendingTimestamps(identity[Long]).process(new ProcessFunction[Long, Long] {override def processElement(i: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = {collector.collect(context.timerService().currentWatermark())}}).print()sEnv.execute()``````-9223372036854775808-9223372036854775808-9223372036854775808```
Free forum by Nabble | Edit this page |