Hi
I noticed that ContinuousProcessingTimeTrigger sometimes does not fire. I asked similar question before and applied this patch. https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8 It looked work but still I have strange behavior. The code is: ---- val env = StreamExecutionEnvironment.getExecutionEnvironment val input = env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .windowAll(TumblingProcessingTimeWindows.of(Time.days(1))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) .fold(Set[String]()){(r,i) => { r + i}} .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} input print --- This case, the base window is long, so I just expect cumulative distinct count of the value every 5 seconds. Appended 8 strings to the input file with 1 second interval. --- % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep 1; done Wed Mar 30 20:51:36 JST 2016 Wed Mar 30 20:51:37 JST 2016 Wed Mar 30 20:51:38 JST 2016 Wed Mar 30 20:51:39 JST 2016 Wed Mar 30 20:51:40 JST 2016 Wed Mar 30 20:51:41 JST 2016 Wed Mar 30 20:51:42 JST 2016 Wed Mar 30 20:51:43 JST 2016 --- But I only received 1 output event. I should receive one more event 5 seconds later, but actually nothing. (2016-03-30 20:51:40.002,4) Later, if I put additional line to the file. I got these events. (2016-03-30 21:12:05.39,9) (2016-03-30 21:12:10.001,9) I slightly modified ContinuousProcessingTimeTrigger.java and added logging in onProcessingTime method. It looks like the method was called at 20:51:40 and 21:12:10, not at 20:51:45 and 21:12:05. ---- 2016-03-30 20:51:40,002 INFO org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger - onProcessingTime called: 2016-03-30 20:51:40.002 ... 2016-03-30 21:12:10,001 INFO org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger - onProcessingTime called: 2016-03-30 21:12:10.001 ---- Is this an expected behavior? Regards, Hironori |
Hi, yes, right now this is expected behavior. But I see that it can be a bit, well, unexpected. The continuous trigger is only set when new elements arrive, so only when you put new elements does the trigger fire again after five seconds. If you want it to truly continuously fire every five seconds even though no new elements arrived you can change the "onProcessingTime" method to this: @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); long nextFireTimestamp = fireState.value(); // only fire if an element didn't already fire long currentTime = System.currentTimeMillis(); if (currentTime > nextFireTimestamp) { long start = currentTime - (currentTime % interval); fireState.update(start + interval); ctx.registerProcessingTimeTimer(start + interval); // <-- I added this call return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } I hope this helps. As I mentioned in the other thread I'm currently thinking about how to make the triggers more intuitive since right now they are not very easy to comprehend because the names can also be misleading. Cheers, Aljoscha On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]> wrote: Hi |
Aljoscha,
Thanks for your response. I understood that trigger is only set when new elements arrive, but in my previous example, trigger fired at 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why next trigger did not set at 20:51:45? It looks like the following situation. - 20:51:40.002 onProcessingTime called, and the trigger fires. In the same method, fireState was updated to 20:51:45. but registerProcessingTimeTimer wad not called, so next timer was not actually set. - 20:51:41 next element comes and onElement called. Since currentTime(21:51:41) < nextFireTimeStamp (20:51:45), it just return TriggerResult.CONTINUE. Next timer was not set. I think next time should be set at 20:51:45 when an element comes at 20:51:41. Am I mis-understanding? Regards, Hironori 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[hidden email]>: > Hi, > yes, right now this is expected behavior. But I see that it can be a bit, > well, unexpected. > > The continuous trigger is only set when new elements arrive, so only when > you put new elements does the trigger fire again after five seconds. If you > want it to truly continuously fire every five seconds even though no new > elements arrived you can change the "onProcessingTime" method to this: > > @Override > public TriggerResult onProcessingTime(long time, W window, TriggerContext > ctx) throws Exception { > > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); > long nextFireTimestamp = fireState.value(); > > // only fire if an element didn't already fire > long currentTime = System.currentTimeMillis(); > if (currentTime > nextFireTimestamp) { > long start = currentTime - (currentTime % interval); > fireState.update(start + interval); > ctx.registerProcessingTimeTimer(start + interval); // <-- I added > this call > return TriggerResult.FIRE; > } > return TriggerResult.CONTINUE; > } > > I hope this helps. As I mentioned in the other thread I'm currently thinking > about how to make the triggers more intuitive since right now they are not > very easy to comprehend because the names can also be misleading. > > Cheers, > Aljoscha > > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]> > wrote: >> >> Hi >> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire. >> >> I asked similar question before and applied this patch. >> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8 >> It looked work but still I have strange behavior. >> >> The code is: >> >> ---- >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val input = >> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >> .windowAll(TumblingProcessingTimeWindows.of(Time.days(1))) >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >> .fold(Set[String]()){(r,i) => { r + i}} >> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >> >> input print >> --- >> >> This case, the base window is long, so I just expect cumulative >> distinct count of the value every 5 seconds. >> >> Appended 8 strings to the input file with 1 second interval. >> >> --- >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep >> 1; done >> Wed Mar 30 20:51:36 JST 2016 >> Wed Mar 30 20:51:37 JST 2016 >> Wed Mar 30 20:51:38 JST 2016 >> Wed Mar 30 20:51:39 JST 2016 >> Wed Mar 30 20:51:40 JST 2016 >> Wed Mar 30 20:51:41 JST 2016 >> Wed Mar 30 20:51:42 JST 2016 >> Wed Mar 30 20:51:43 JST 2016 >> --- >> >> But I only received 1 output event. I should receive one more event 5 >> seconds later, but actually nothing. >> >> (2016-03-30 20:51:40.002,4) >> >> Later, if I put additional line to the file. I got these events. >> >> (2016-03-30 21:12:05.39,9) >> (2016-03-30 21:12:10.001,9) >> >> I slightly modified ContinuousProcessingTimeTrigger.java and added >> logging in onProcessingTime method. It looks like the method was >> called at 20:51:40 and 21:12:10, not at 20:51:45 and 21:12:05. >> >> ---- >> 2016-03-30 20:51:40,002 INFO >> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >> - onProcessingTime called: 2016-03-30 20:51:40.002 >> ... >> 2016-03-30 21:12:10,001 INFO >> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >> - onProcessingTime called: 2016-03-30 21:12:10.001 >> ---- >> >> Is this an expected behavior? >> >> Regards, >> Hironori |
Oh I see what you mean now. I think the problem is that onProcessingTime changes nextFireTimestamp without actually setting a Trigger, as you said. I think changing onProcessingTime to this should have the correct result: @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); long nextFireTimestamp = fireState.value(); // only fire if an element didn't already fire long currentTime = System.currentTimeMillis(); if (currentTime > nextFireTimestamp) { fireState.update(0); // <- set to 0 so that onElement will set a timer return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } What do you think? This should have the behavior that it continuously fires, but only if new elements arrive. Cheers, Aljoscha On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <[hidden email]> wrote: Aljoscha, |
Aljoscha,
Thank you. That change looks good. I will try. Regards, Hironori 2016-03-31 22:20 GMT+09:00 Aljoscha Krettek <[hidden email]>: > Oh I see what you mean now. I think the problem is that onProcessingTime > changes nextFireTimestamp without actually setting a Trigger, as you said. > > I think changing onProcessingTime to this should have the correct result: > > @Override > public TriggerResult onProcessingTime(long time, W window, TriggerContext > ctx) throws Exception { > > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); > long nextFireTimestamp = fireState.value(); > > // only fire if an element didn't already fire > long currentTime = System.currentTimeMillis(); > if (currentTime > nextFireTimestamp) { > fireState.update(0); // <- set to 0 so that onElement will set a > timer > return TriggerResult.FIRE; > } > return TriggerResult.CONTINUE; > } > > What do you think? This should have the behavior that it continuously fires, > but only if new elements arrive. > > Cheers, > Aljoscha > > On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <[hidden email]> > wrote: >> >> Aljoscha, >> >> Thanks for your response. >> I understood that trigger is only set when new elements arrive, but in >> my previous example, trigger fired at >> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why >> next trigger did not set at 20:51:45? >> >> It looks like the following situation. >> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the >> same method, fireState was updated to 20:51:45. but >> registerProcessingTimeTimer wad not called, so next timer was not >> actually set. >> - 20:51:41 next element comes and onElement called. Since >> currentTime(21:51:41) < nextFireTimeStamp (20:51:45), >> it just return TriggerResult.CONTINUE. Next timer was not set. >> >> I think next time should be set at 20:51:45 when an element comes at >> 20:51:41. >> Am I mis-understanding? >> >> Regards, >> Hironori >> >> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[hidden email]>: >> > Hi, >> > yes, right now this is expected behavior. But I see that it can be a >> > bit, >> > well, unexpected. >> > >> > The continuous trigger is only set when new elements arrive, so only >> > when >> > you put new elements does the trigger fire again after five seconds. If >> > you >> > want it to truly continuously fire every five seconds even though no new >> > elements arrived you can change the "onProcessingTime" method to this: >> > >> > @Override >> > public TriggerResult onProcessingTime(long time, W window, >> > TriggerContext >> > ctx) throws Exception { >> > >> > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); >> > long nextFireTimestamp = fireState.value(); >> > >> > // only fire if an element didn't already fire >> > long currentTime = System.currentTimeMillis(); >> > if (currentTime > nextFireTimestamp) { >> > long start = currentTime - (currentTime % interval); >> > fireState.update(start + interval); >> > ctx.registerProcessingTimeTimer(start + interval); // <-- I >> > added >> > this call >> > return TriggerResult.FIRE; >> > } >> > return TriggerResult.CONTINUE; >> > } >> > >> > I hope this helps. As I mentioned in the other thread I'm currently >> > thinking >> > about how to make the triggers more intuitive since right now they are >> > not >> > very easy to comprehend because the names can also be misleading. >> > >> > Cheers, >> > Aljoscha >> > >> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]> >> > wrote: >> >> >> >> Hi >> >> >> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire. >> >> >> >> I asked similar question before and applied this patch. >> >> >> >> >> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8 >> >> It looked work but still I have strange behavior. >> >> >> >> The code is: >> >> >> >> ---- >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> val input = >> >> >> >> >> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >> >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >> >> .windowAll(TumblingProcessingTimeWindows.of(Time.days(1))) >> >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >> >> .fold(Set[String]()){(r,i) => { r + i}} >> >> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >> >> >> >> input print >> >> --- >> >> >> >> This case, the base window is long, so I just expect cumulative >> >> distinct count of the value every 5 seconds. >> >> >> >> Appended 8 strings to the input file with 1 second interval. >> >> >> >> --- >> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep >> >> 1; done >> >> Wed Mar 30 20:51:36 JST 2016 >> >> Wed Mar 30 20:51:37 JST 2016 >> >> Wed Mar 30 20:51:38 JST 2016 >> >> Wed Mar 30 20:51:39 JST 2016 >> >> Wed Mar 30 20:51:40 JST 2016 >> >> Wed Mar 30 20:51:41 JST 2016 >> >> Wed Mar 30 20:51:42 JST 2016 >> >> Wed Mar 30 20:51:43 JST 2016 >> >> --- >> >> >> >> But I only received 1 output event. I should receive one more event 5 >> >> seconds later, but actually nothing. >> >> >> >> (2016-03-30 20:51:40.002,4) >> >> >> >> Later, if I put additional line to the file. I got these events. >> >> >> >> (2016-03-30 21:12:05.39,9) >> >> (2016-03-30 21:12:10.001,9) >> >> >> >> I slightly modified ContinuousProcessingTimeTrigger.java and added >> >> logging in onProcessingTime method. It looks like the method was >> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and 21:12:05. >> >> >> >> ---- >> >> 2016-03-30 20:51:40,002 INFO >> >> >> >> >> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >> >> - onProcessingTime called: 2016-03-30 20:51:40.002 >> >> ... >> >> 2016-03-30 21:12:10,001 INFO >> >> >> >> >> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >> >> - onProcessingTime called: 2016-03-30 21:12:10.001 >> >> ---- >> >> >> >> Is this an expected behavior? >> >> >> >> Regards, >> >> Hironori |
It worked as expected.
One thing I also need to modify was the condition in onProcessingTime and onElement if (currentTime > nextFireTimestamp) { to if (currentTime >= nextFireTimestamp) { Because there was a case when currentTime and nextFireTimestamp was equal, so the trigger did not fire. Thanks a lot for your help! Regards, Hironori 2016-04-01 0:15 GMT+09:00 Hironori Ogibayashi <[hidden email]>: > Aljoscha, > > Thank you. That change looks good. I will try. > > Regards, > Hironori > > 2016-03-31 22:20 GMT+09:00 Aljoscha Krettek <[hidden email]>: >> Oh I see what you mean now. I think the problem is that onProcessingTime >> changes nextFireTimestamp without actually setting a Trigger, as you said. >> >> I think changing onProcessingTime to this should have the correct result: >> >> @Override >> public TriggerResult onProcessingTime(long time, W window, TriggerContext >> ctx) throws Exception { >> >> ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); >> long nextFireTimestamp = fireState.value(); >> >> // only fire if an element didn't already fire >> long currentTime = System.currentTimeMillis(); >> if (currentTime > nextFireTimestamp) { >> fireState.update(0); // <- set to 0 so that onElement will set a >> timer >> return TriggerResult.FIRE; >> } >> return TriggerResult.CONTINUE; >> } >> >> What do you think? This should have the behavior that it continuously fires, >> but only if new elements arrive. >> >> Cheers, >> Aljoscha >> >> On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <[hidden email]> >> wrote: >>> >>> Aljoscha, >>> >>> Thanks for your response. >>> I understood that trigger is only set when new elements arrive, but in >>> my previous example, trigger fired at >>> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why >>> next trigger did not set at 20:51:45? >>> >>> It looks like the following situation. >>> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the >>> same method, fireState was updated to 20:51:45. but >>> registerProcessingTimeTimer wad not called, so next timer was not >>> actually set. >>> - 20:51:41 next element comes and onElement called. Since >>> currentTime(21:51:41) < nextFireTimeStamp (20:51:45), >>> it just return TriggerResult.CONTINUE. Next timer was not set. >>> >>> I think next time should be set at 20:51:45 when an element comes at >>> 20:51:41. >>> Am I mis-understanding? >>> >>> Regards, >>> Hironori >>> >>> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[hidden email]>: >>> > Hi, >>> > yes, right now this is expected behavior. But I see that it can be a >>> > bit, >>> > well, unexpected. >>> > >>> > The continuous trigger is only set when new elements arrive, so only >>> > when >>> > you put new elements does the trigger fire again after five seconds. If >>> > you >>> > want it to truly continuously fire every five seconds even though no new >>> > elements arrived you can change the "onProcessingTime" method to this: >>> > >>> > @Override >>> > public TriggerResult onProcessingTime(long time, W window, >>> > TriggerContext >>> > ctx) throws Exception { >>> > >>> > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); >>> > long nextFireTimestamp = fireState.value(); >>> > >>> > // only fire if an element didn't already fire >>> > long currentTime = System.currentTimeMillis(); >>> > if (currentTime > nextFireTimestamp) { >>> > long start = currentTime - (currentTime % interval); >>> > fireState.update(start + interval); >>> > ctx.registerProcessingTimeTimer(start + interval); // <-- I >>> > added >>> > this call >>> > return TriggerResult.FIRE; >>> > } >>> > return TriggerResult.CONTINUE; >>> > } >>> > >>> > I hope this helps. As I mentioned in the other thread I'm currently >>> > thinking >>> > about how to make the triggers more intuitive since right now they are >>> > not >>> > very easy to comprehend because the names can also be misleading. >>> > >>> > Cheers, >>> > Aljoscha >>> > >>> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]> >>> > wrote: >>> >> >>> >> Hi >>> >> >>> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire. >>> >> >>> >> I asked similar question before and applied this patch. >>> >> >>> >> >>> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8 >>> >> It looked work but still I have strange behavior. >>> >> >>> >> The code is: >>> >> >>> >> ---- >>> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> >> val input = >>> >> >>> >> >>> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >>> >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >>> >> .windowAll(TumblingProcessingTimeWindows.of(Time.days(1))) >>> >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >>> >> .fold(Set[String]()){(r,i) => { r + i}} >>> >> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >>> >> >>> >> input print >>> >> --- >>> >> >>> >> This case, the base window is long, so I just expect cumulative >>> >> distinct count of the value every 5 seconds. >>> >> >>> >> Appended 8 strings to the input file with 1 second interval. >>> >> >>> >> --- >>> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep >>> >> 1; done >>> >> Wed Mar 30 20:51:36 JST 2016 >>> >> Wed Mar 30 20:51:37 JST 2016 >>> >> Wed Mar 30 20:51:38 JST 2016 >>> >> Wed Mar 30 20:51:39 JST 2016 >>> >> Wed Mar 30 20:51:40 JST 2016 >>> >> Wed Mar 30 20:51:41 JST 2016 >>> >> Wed Mar 30 20:51:42 JST 2016 >>> >> Wed Mar 30 20:51:43 JST 2016 >>> >> --- >>> >> >>> >> But I only received 1 output event. I should receive one more event 5 >>> >> seconds later, but actually nothing. >>> >> >>> >> (2016-03-30 20:51:40.002,4) >>> >> >>> >> Later, if I put additional line to the file. I got these events. >>> >> >>> >> (2016-03-30 21:12:05.39,9) >>> >> (2016-03-30 21:12:10.001,9) >>> >> >>> >> I slightly modified ContinuousProcessingTimeTrigger.java and added >>> >> logging in onProcessingTime method. It looks like the method was >>> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and 21:12:05. >>> >> >>> >> ---- >>> >> 2016-03-30 20:51:40,002 INFO >>> >> >>> >> >>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >>> >> - onProcessingTime called: 2016-03-30 20:51:40.002 >>> >> ... >>> >> 2016-03-30 21:12:10,001 INFO >>> >> >>> >> >>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >>> >> - onProcessingTime called: 2016-03-30 21:12:10.001 >>> >> ---- >>> >> >>> >> Is this an expected behavior? >>> >> >>> >> Regards, >>> >> Hironori |
Free forum by Nabble | Edit this page |