I'm going to sort elements in a PriorityQueue and set up timers at (currentWatermark + 1), following the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing. However, it seems that context.timerService().currentWatermark() always returns Long.MinValue and my onTimer will never be called. Here's minimal program to reproduce the problem. Am I missing something? ``` val sEnv = StreamExecutionEnvironment.getExecutionEnvironment sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) sEnv.setParallelism(argOps.parallelism()) sEnv.fromCollection(Seq[Long](1, 2, 3)).assignAscendingTimestamps(identity[Long]) .process(new ProcessFunction[Long, Long] { override def processElement(i: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = { collector.collect(context.timerService().currentWatermark()) } }).print() sEnv.execute() ``` ``` -9223372036854775808 -9223372036854775808 -9223372036854775808 ``` |
Hi, It generates watermark periodically by default in the underlying implementation of `assignAscendingTimestamps`. So for your test program, the watermark is still not generated yet and I think that's the reason why it's Long.MinValue. Regards, Dian
|
It seems to be the case. But when I use timeWindow or CEP with fromCollection, it works well. For example, ``` sEnv.fromCollection(Seq[Long](1, 1002, 2002, 3002)).assignAscendingTimestamps(identity[Long]) .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print() ``` prints ``` 1 1002 2002 3002 ``` How can I implement my KeyedProcessFunction so that it would work as expected. Dian Fu <[hidden email]> 于 2019年10月28日周一 下午2:04写道:
|
Before a program close, it will emit Long.MaxValue as the watermark and that watermark will trigger all the windows. This is the reason why your `timeWindow` program could work. However, for the first program, you have not registered the event time timer(though context.timerService.registerEventTimeTimer) and also there is also no onTimer logic defined to process it.
|
The reason why the watermark is not advancing is that
assignAscendingTimestamps is a periodic watermark generator. This style of watermark generator is called at regular intervals to create watermarks -- by default, this is done every 200 msec. With only a tiny bit of data to process, the job doesn't run long enough for the watermark generator to ever be called. On Mon, Oct 28, 2019 at 9:17 AM Dian Fu <[hidden email]> wrote: > > Before a program close, it will emit Long.MaxValue as the watermark and that watermark will trigger all the windows. This is the reason why your `timeWindow` program could work. However, for the first program, you have not registered the event time timer(though context.timerService.registerEventTimeTimer) and also there is also no onTimer logic defined to process it. > > 在 2019年10月28日,下午4:01,杨力 <[hidden email]> 写道: > > It seems to be the case. But when I use timeWindow or CEP with fromCollection, it works well. For example, > > ``` > sEnv.fromCollection(Seq[Long](1, 1002, 2002, 3002)).assignAscendingTimestamps(identity[Long]) > .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print() > ``` > > prints > > ``` > 1 > 1002 > 2002 > 3002 > ``` > > How can I implement my KeyedProcessFunction so that it would work as expected. > > Dian Fu <[hidden email]> 于 2019年10月28日周一 下午2:04写道: >> >> Hi, >> >> It generates watermark periodically by default in the underlying implementation of `assignAscendingTimestamps`. So for your test program, the watermark is still not generated yet and I think that's the reason why it's Long.MinValue. >> >> Regards, >> Dian >> >> 在 2019年10月28日,上午11:59,杨力 <[hidden email]> 写道: >> >> I'm going to sort elements in a PriorityQueue and set up timers at (currentWatermark + 1), following the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing. >> >> However, it seems that context.timerService().currentWatermark() always returns Long.MinValue and my onTimer will never be called. Here's minimal program to reproduce the problem. Am I missing something? >> >> ``` >> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment >> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> sEnv.setParallelism(argOps.parallelism()) >> sEnv.fromCollection(Seq[Long](1, 2, 3)).assignAscendingTimestamps(identity[Long]) >> .process(new ProcessFunction[Long, Long] { >> override def processElement(i: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = { >> collector.collect(context.timerService().currentWatermark()) >> } >> }).print() >> sEnv.execute() >> ``` >> >> ``` >> -9223372036854775808 >> -9223372036854775808 >> -9223372036854775808 >> ``` >> >> > |
In reply to this post by Dian Fu
Thank you for your response. Registering a timer at Long.MaxValue works. And I have found the mistake in my original code. When a timer fires and there are elements in the priority queue with timestamp greater than current watermark, they do not get processed. A new timer should be registered for these elements. I just forgot theses unprocessed elements. Dian Fu <[hidden email]> 于 2019年10月28日周一 下午4:17写道:
|
Free forum by Nabble | Edit this page |