ContinuousProcessingTimeTrigger does not fire

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

ContinuousProcessingTimeTrigger does not fire

Hironori Ogibayashi
Hi

I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.

I asked similar question before and applied this patch.
https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
It looked work but still I have strange behavior.

The code is:

----
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input =
env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
      .fold(Set[String]()){(r,i) => { r + i}}
      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}

    input print
---

This case, the base window is long, so I just expect cumulative
distinct count of the value every 5 seconds.

Appended 8 strings to the input file with 1 second interval.

---
% for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
1; done
Wed Mar 30 20:51:36 JST 2016
Wed Mar 30 20:51:37 JST 2016
Wed Mar 30 20:51:38 JST 2016
Wed Mar 30 20:51:39 JST 2016
Wed Mar 30 20:51:40 JST 2016
Wed Mar 30 20:51:41 JST 2016
Wed Mar 30 20:51:42 JST 2016
Wed Mar 30 20:51:43 JST 2016
---

But I only received 1 output event. I should receive one more event  5
seconds later, but actually nothing.

(2016-03-30 20:51:40.002,4)

Later, if I put additional line to the file. I got these events.

(2016-03-30 21:12:05.39,9)
(2016-03-30 21:12:10.001,9)

I slightly modified ContinuousProcessingTimeTrigger.java and added
logging in onProcessingTime method. It looks like the method was
called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.

----
2016-03-30 20:51:40,002 INFO
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
 - onProcessingTime called: 2016-03-30 20:51:40.002
...
2016-03-30 21:12:10,001 INFO
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
 - onProcessingTime called: 2016-03-30 21:12:10.001
----

Is this an expected behavior?

Regards,
Hironori
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousProcessingTimeTrigger does not fire

Aljoscha Krettek
Hi,
yes, right now this is expected behavior. But I see that it can be a bit, well,  unexpected.

The continuous trigger is only set when new elements arrive, so only when you put new elements does the trigger fire again after five seconds. If you want it to truly continuously fire every five seconds even though no new elements arrived you can change the "onProcessingTime" method to this:

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {

    ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
    long nextFireTimestamp = fireState.value();

    // only fire if an element didn't already fire
    long currentTime = System.currentTimeMillis();
    if (currentTime > nextFireTimestamp) {
        long start = currentTime - (currentTime % interval);
        fireState.update(start + interval);
        ctx.registerProcessingTimeTimer(start +  interval); // <-- I added this call
        return TriggerResult.FIRE;
    }
    return TriggerResult.CONTINUE;
}

I hope this helps. As I mentioned in the other thread I'm currently thinking about how to make the triggers more intuitive since right now they are not very easy to comprehend because the names can also be misleading.

Cheers,
Aljoscha

On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]> wrote:
Hi

I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.

I asked similar question before and applied this patch.
https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
It looked work but still I have strange behavior.

The code is:

----
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input =
env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
      .fold(Set[String]()){(r,i) => { r + i}}
      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}

    input print
---

This case, the base window is long, so I just expect cumulative
distinct count of the value every 5 seconds.

Appended 8 strings to the input file with 1 second interval.

---
% for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
1; done
Wed Mar 30 20:51:36 JST 2016
Wed Mar 30 20:51:37 JST 2016
Wed Mar 30 20:51:38 JST 2016
Wed Mar 30 20:51:39 JST 2016
Wed Mar 30 20:51:40 JST 2016
Wed Mar 30 20:51:41 JST 2016
Wed Mar 30 20:51:42 JST 2016
Wed Mar 30 20:51:43 JST 2016
---

But I only received 1 output event. I should receive one more event  5
seconds later, but actually nothing.

(2016-03-30 20:51:40.002,4)

Later, if I put additional line to the file. I got these events.

(2016-03-30 21:12:05.39,9)
(2016-03-30 21:12:10.001,9)

I slightly modified ContinuousProcessingTimeTrigger.java and added
logging in onProcessingTime method. It looks like the method was
called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.

----
2016-03-30 20:51:40,002 INFO
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
 - onProcessingTime called: 2016-03-30 20:51:40.002
...
2016-03-30 21:12:10,001 INFO
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
 - onProcessingTime called: 2016-03-30 21:12:10.001
----

Is this an expected behavior?

Regards,
Hironori
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousProcessingTimeTrigger does not fire

Hironori Ogibayashi
Aljoscha,

Thanks for your response.
I understood that trigger is only set when new elements arrive, but in
my previous example, trigger fired at
20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why
next trigger did not set at 20:51:45?

It looks like the following situation.
- 20:51:40.002 onProcessingTime called, and the trigger fires. In the
same method, fireState was updated to 20:51:45. but
registerProcessingTimeTimer wad not called, so next timer was not
actually set.
- 20:51:41 next element comes and onElement called. Since
currentTime(21:51:41) < nextFireTimeStamp (20:51:45),
 it just return TriggerResult.CONTINUE. Next timer was not set.

I think next time should be set at 20:51:45 when an element comes at 20:51:41.
Am I mis-understanding?

Regards,
Hironori

2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[hidden email]>:

> Hi,
> yes, right now this is expected behavior. But I see that it can be a bit,
> well,  unexpected.
>
> The continuous trigger is only set when new elements arrive, so only when
> you put new elements does the trigger fire again after five seconds. If you
> want it to truly continuously fire every five seconds even though no new
> elements arrived you can change the "onProcessingTime" method to this:
>
> @Override
> public TriggerResult onProcessingTime(long time, W window, TriggerContext
> ctx) throws Exception {
>
>     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>     long nextFireTimestamp = fireState.value();
>
>     // only fire if an element didn't already fire
>     long currentTime = System.currentTimeMillis();
>     if (currentTime > nextFireTimestamp) {
>         long start = currentTime - (currentTime % interval);
>         fireState.update(start + interval);
>         ctx.registerProcessingTimeTimer(start +  interval); // <-- I added
> this call
>         return TriggerResult.FIRE;
>     }
>     return TriggerResult.CONTINUE;
> }
>
> I hope this helps. As I mentioned in the other thread I'm currently thinking
> about how to make the triggers more intuitive since right now they are not
> very easy to comprehend because the names can also be misleading.
>
> Cheers,
> Aljoscha
>
> On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]>
> wrote:
>>
>> Hi
>>
>> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
>>
>> I asked similar question before and applied this patch.
>>
>> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
>> It looked work but still I have strange behavior.
>>
>> The code is:
>>
>> ----
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     val input =
>>
>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>       .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>       .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
>>       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>       .fold(Set[String]()){(r,i) => { r + i}}
>>       .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>
>>     input print
>> ---
>>
>> This case, the base window is long, so I just expect cumulative
>> distinct count of the value every 5 seconds.
>>
>> Appended 8 strings to the input file with 1 second interval.
>>
>> ---
>> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
>> 1; done
>> Wed Mar 30 20:51:36 JST 2016
>> Wed Mar 30 20:51:37 JST 2016
>> Wed Mar 30 20:51:38 JST 2016
>> Wed Mar 30 20:51:39 JST 2016
>> Wed Mar 30 20:51:40 JST 2016
>> Wed Mar 30 20:51:41 JST 2016
>> Wed Mar 30 20:51:42 JST 2016
>> Wed Mar 30 20:51:43 JST 2016
>> ---
>>
>> But I only received 1 output event. I should receive one more event  5
>> seconds later, but actually nothing.
>>
>> (2016-03-30 20:51:40.002,4)
>>
>> Later, if I put additional line to the file. I got these events.
>>
>> (2016-03-30 21:12:05.39,9)
>> (2016-03-30 21:12:10.001,9)
>>
>> I slightly modified ContinuousProcessingTimeTrigger.java and added
>> logging in onProcessingTime method. It looks like the method was
>> called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.
>>
>> ----
>> 2016-03-30 20:51:40,002 INFO
>>
>> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>  - onProcessingTime called: 2016-03-30 20:51:40.002
>> ...
>> 2016-03-30 21:12:10,001 INFO
>>
>> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>  - onProcessingTime called: 2016-03-30 21:12:10.001
>> ----
>>
>> Is this an expected behavior?
>>
>> Regards,
>> Hironori
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousProcessingTimeTrigger does not fire

Aljoscha Krettek
Oh I see what you mean now. I think the problem is that onProcessingTime changes nextFireTimestamp without actually setting a Trigger, as you said.

I think changing onProcessingTime to this should have the correct result:

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {

    ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
    long nextFireTimestamp = fireState.value();

    // only fire if an element didn't already fire
    long currentTime = System.currentTimeMillis();
    if (currentTime > nextFireTimestamp) {
        fireState.update(0); // <- set to 0 so that onElement will set a timer 
        return TriggerResult.FIRE;
    }
    return TriggerResult.CONTINUE;
}

What do you think? This should have the behavior that it continuously fires, but only if new elements arrive.

Cheers,
Aljoscha

On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <[hidden email]> wrote:
Aljoscha,

Thanks for your response.
I understood that trigger is only set when new elements arrive, but in
my previous example, trigger fired at
20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why
next trigger did not set at 20:51:45?

It looks like the following situation.
- 20:51:40.002 onProcessingTime called, and the trigger fires. In the
same method, fireState was updated to 20:51:45. but
registerProcessingTimeTimer wad not called, so next timer was not
actually set.
- 20:51:41 next element comes and onElement called. Since
currentTime(21:51:41) < nextFireTimeStamp (20:51:45),
 it just return TriggerResult.CONTINUE. Next timer was not set.

I think next time should be set at 20:51:45 when an element comes at 20:51:41.
Am I mis-understanding?

Regards,
Hironori

2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[hidden email]>:
> Hi,
> yes, right now this is expected behavior. But I see that it can be a bit,
> well,  unexpected.
>
> The continuous trigger is only set when new elements arrive, so only when
> you put new elements does the trigger fire again after five seconds. If you
> want it to truly continuously fire every five seconds even though no new
> elements arrived you can change the "onProcessingTime" method to this:
>
> @Override
> public TriggerResult onProcessingTime(long time, W window, TriggerContext
> ctx) throws Exception {
>
>     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>     long nextFireTimestamp = fireState.value();
>
>     // only fire if an element didn't already fire
>     long currentTime = System.currentTimeMillis();
>     if (currentTime > nextFireTimestamp) {
>         long start = currentTime - (currentTime % interval);
>         fireState.update(start + interval);
>         ctx.registerProcessingTimeTimer(start +  interval); // <-- I added
> this call
>         return TriggerResult.FIRE;
>     }
>     return TriggerResult.CONTINUE;
> }
>
> I hope this helps. As I mentioned in the other thread I'm currently thinking
> about how to make the triggers more intuitive since right now they are not
> very easy to comprehend because the names can also be misleading.
>
> Cheers,
> Aljoscha
>
> On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]>
> wrote:
>>
>> Hi
>>
>> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
>>
>> I asked similar question before and applied this patch.
>>
>> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
>> It looked work but still I have strange behavior.
>>
>> The code is:
>>
>> ----
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     val input =
>>
>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>       .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>       .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
>>       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>       .fold(Set[String]()){(r,i) => { r + i}}
>>       .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>
>>     input print
>> ---
>>
>> This case, the base window is long, so I just expect cumulative
>> distinct count of the value every 5 seconds.
>>
>> Appended 8 strings to the input file with 1 second interval.
>>
>> ---
>> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
>> 1; done
>> Wed Mar 30 20:51:36 JST 2016
>> Wed Mar 30 20:51:37 JST 2016
>> Wed Mar 30 20:51:38 JST 2016
>> Wed Mar 30 20:51:39 JST 2016
>> Wed Mar 30 20:51:40 JST 2016
>> Wed Mar 30 20:51:41 JST 2016
>> Wed Mar 30 20:51:42 JST 2016
>> Wed Mar 30 20:51:43 JST 2016
>> ---
>>
>> But I only received 1 output event. I should receive one more event  5
>> seconds later, but actually nothing.
>>
>> (2016-03-30 20:51:40.002,4)
>>
>> Later, if I put additional line to the file. I got these events.
>>
>> (2016-03-30 21:12:05.39,9)
>> (2016-03-30 21:12:10.001,9)
>>
>> I slightly modified ContinuousProcessingTimeTrigger.java and added
>> logging in onProcessingTime method. It looks like the method was
>> called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.
>>
>> ----
>> 2016-03-30 20:51:40,002 INFO
>>
>> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>  - onProcessingTime called: 2016-03-30 20:51:40.002
>> ...
>> 2016-03-30 21:12:10,001 INFO
>>
>> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>  - onProcessingTime called: 2016-03-30 21:12:10.001
>> ----
>>
>> Is this an expected behavior?
>>
>> Regards,
>> Hironori
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousProcessingTimeTrigger does not fire

Hironori Ogibayashi
Aljoscha,

Thank you. That change looks good. I will try.

Regards,
Hironori

2016-03-31 22:20 GMT+09:00 Aljoscha Krettek <[hidden email]>:

> Oh I see what you mean now. I think the problem is that onProcessingTime
> changes nextFireTimestamp without actually setting a Trigger, as you said.
>
> I think changing onProcessingTime to this should have the correct result:
>
> @Override
> public TriggerResult onProcessingTime(long time, W window, TriggerContext
> ctx) throws Exception {
>
>     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>     long nextFireTimestamp = fireState.value();
>
>     // only fire if an element didn't already fire
>     long currentTime = System.currentTimeMillis();
>     if (currentTime > nextFireTimestamp) {
>         fireState.update(0); // <- set to 0 so that onElement will set a
> timer
>         return TriggerResult.FIRE;
>     }
>     return TriggerResult.CONTINUE;
> }
>
> What do you think? This should have the behavior that it continuously fires,
> but only if new elements arrive.
>
> Cheers,
> Aljoscha
>
> On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <[hidden email]>
> wrote:
>>
>> Aljoscha,
>>
>> Thanks for your response.
>> I understood that trigger is only set when new elements arrive, but in
>> my previous example, trigger fired at
>> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why
>> next trigger did not set at 20:51:45?
>>
>> It looks like the following situation.
>> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the
>> same method, fireState was updated to 20:51:45. but
>> registerProcessingTimeTimer wad not called, so next timer was not
>> actually set.
>> - 20:51:41 next element comes and onElement called. Since
>> currentTime(21:51:41) < nextFireTimeStamp (20:51:45),
>>  it just return TriggerResult.CONTINUE. Next timer was not set.
>>
>> I think next time should be set at 20:51:45 when an element comes at
>> 20:51:41.
>> Am I mis-understanding?
>>
>> Regards,
>> Hironori
>>
>> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>> > Hi,
>> > yes, right now this is expected behavior. But I see that it can be a
>> > bit,
>> > well,  unexpected.
>> >
>> > The continuous trigger is only set when new elements arrive, so only
>> > when
>> > you put new elements does the trigger fire again after five seconds. If
>> > you
>> > want it to truly continuously fire every five seconds even though no new
>> > elements arrived you can change the "onProcessingTime" method to this:
>> >
>> > @Override
>> > public TriggerResult onProcessingTime(long time, W window,
>> > TriggerContext
>> > ctx) throws Exception {
>> >
>> >     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>> >     long nextFireTimestamp = fireState.value();
>> >
>> >     // only fire if an element didn't already fire
>> >     long currentTime = System.currentTimeMillis();
>> >     if (currentTime > nextFireTimestamp) {
>> >         long start = currentTime - (currentTime % interval);
>> >         fireState.update(start + interval);
>> >         ctx.registerProcessingTimeTimer(start +  interval); // <-- I
>> > added
>> > this call
>> >         return TriggerResult.FIRE;
>> >     }
>> >     return TriggerResult.CONTINUE;
>> > }
>> >
>> > I hope this helps. As I mentioned in the other thread I'm currently
>> > thinking
>> > about how to make the triggers more intuitive since right now they are
>> > not
>> > very easy to comprehend because the names can also be misleading.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]>
>> > wrote:
>> >>
>> >> Hi
>> >>
>> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
>> >>
>> >> I asked similar question before and applied this patch.
>> >>
>> >>
>> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
>> >> It looked work but still I have strange behavior.
>> >>
>> >> The code is:
>> >>
>> >> ----
>> >>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >>     val input =
>> >>
>> >>
>> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>> >>       .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>> >>       .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
>> >>       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>> >>       .fold(Set[String]()){(r,i) => { r + i}}
>> >>       .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>> >>
>> >>     input print
>> >> ---
>> >>
>> >> This case, the base window is long, so I just expect cumulative
>> >> distinct count of the value every 5 seconds.
>> >>
>> >> Appended 8 strings to the input file with 1 second interval.
>> >>
>> >> ---
>> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
>> >> 1; done
>> >> Wed Mar 30 20:51:36 JST 2016
>> >> Wed Mar 30 20:51:37 JST 2016
>> >> Wed Mar 30 20:51:38 JST 2016
>> >> Wed Mar 30 20:51:39 JST 2016
>> >> Wed Mar 30 20:51:40 JST 2016
>> >> Wed Mar 30 20:51:41 JST 2016
>> >> Wed Mar 30 20:51:42 JST 2016
>> >> Wed Mar 30 20:51:43 JST 2016
>> >> ---
>> >>
>> >> But I only received 1 output event. I should receive one more event  5
>> >> seconds later, but actually nothing.
>> >>
>> >> (2016-03-30 20:51:40.002,4)
>> >>
>> >> Later, if I put additional line to the file. I got these events.
>> >>
>> >> (2016-03-30 21:12:05.39,9)
>> >> (2016-03-30 21:12:10.001,9)
>> >>
>> >> I slightly modified ContinuousProcessingTimeTrigger.java and added
>> >> logging in onProcessingTime method. It looks like the method was
>> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.
>> >>
>> >> ----
>> >> 2016-03-30 20:51:40,002 INFO
>> >>
>> >>
>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>> >>  - onProcessingTime called: 2016-03-30 20:51:40.002
>> >> ...
>> >> 2016-03-30 21:12:10,001 INFO
>> >>
>> >>
>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>> >>  - onProcessingTime called: 2016-03-30 21:12:10.001
>> >> ----
>> >>
>> >> Is this an expected behavior?
>> >>
>> >> Regards,
>> >> Hironori
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousProcessingTimeTrigger does not fire

Hironori Ogibayashi
It worked as expected.
One thing I also need to modify was the condition in onProcessingTime
and onElement

          if (currentTime > nextFireTimestamp) {

to

          if (currentTime >= nextFireTimestamp) {

Because there was a case when currentTime and nextFireTimestamp was
equal, so the trigger did not fire.

Thanks a lot for your help!

Regards,
Hironori


2016-04-01 0:15 GMT+09:00 Hironori Ogibayashi <[hidden email]>:

> Aljoscha,
>
> Thank you. That change looks good. I will try.
>
> Regards,
> Hironori
>
> 2016-03-31 22:20 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>> Oh I see what you mean now. I think the problem is that onProcessingTime
>> changes nextFireTimestamp without actually setting a Trigger, as you said.
>>
>> I think changing onProcessingTime to this should have the correct result:
>>
>> @Override
>> public TriggerResult onProcessingTime(long time, W window, TriggerContext
>> ctx) throws Exception {
>>
>>     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>>     long nextFireTimestamp = fireState.value();
>>
>>     // only fire if an element didn't already fire
>>     long currentTime = System.currentTimeMillis();
>>     if (currentTime > nextFireTimestamp) {
>>         fireState.update(0); // <- set to 0 so that onElement will set a
>> timer
>>         return TriggerResult.FIRE;
>>     }
>>     return TriggerResult.CONTINUE;
>> }
>>
>> What do you think? This should have the behavior that it continuously fires,
>> but only if new elements arrive.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <[hidden email]>
>> wrote:
>>>
>>> Aljoscha,
>>>
>>> Thanks for your response.
>>> I understood that trigger is only set when new elements arrive, but in
>>> my previous example, trigger fired at
>>> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why
>>> next trigger did not set at 20:51:45?
>>>
>>> It looks like the following situation.
>>> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the
>>> same method, fireState was updated to 20:51:45. but
>>> registerProcessingTimeTimer wad not called, so next timer was not
>>> actually set.
>>> - 20:51:41 next element comes and onElement called. Since
>>> currentTime(21:51:41) < nextFireTimeStamp (20:51:45),
>>>  it just return TriggerResult.CONTINUE. Next timer was not set.
>>>
>>> I think next time should be set at 20:51:45 when an element comes at
>>> 20:51:41.
>>> Am I mis-understanding?
>>>
>>> Regards,
>>> Hironori
>>>
>>> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>>> > Hi,
>>> > yes, right now this is expected behavior. But I see that it can be a
>>> > bit,
>>> > well,  unexpected.
>>> >
>>> > The continuous trigger is only set when new elements arrive, so only
>>> > when
>>> > you put new elements does the trigger fire again after five seconds. If
>>> > you
>>> > want it to truly continuously fire every five seconds even though no new
>>> > elements arrived you can change the "onProcessingTime" method to this:
>>> >
>>> > @Override
>>> > public TriggerResult onProcessingTime(long time, W window,
>>> > TriggerContext
>>> > ctx) throws Exception {
>>> >
>>> >     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>>> >     long nextFireTimestamp = fireState.value();
>>> >
>>> >     // only fire if an element didn't already fire
>>> >     long currentTime = System.currentTimeMillis();
>>> >     if (currentTime > nextFireTimestamp) {
>>> >         long start = currentTime - (currentTime % interval);
>>> >         fireState.update(start + interval);
>>> >         ctx.registerProcessingTimeTimer(start +  interval); // <-- I
>>> > added
>>> > this call
>>> >         return TriggerResult.FIRE;
>>> >     }
>>> >     return TriggerResult.CONTINUE;
>>> > }
>>> >
>>> > I hope this helps. As I mentioned in the other thread I'm currently
>>> > thinking
>>> > about how to make the triggers more intuitive since right now they are
>>> > not
>>> > very easy to comprehend because the names can also be misleading.
>>> >
>>> > Cheers,
>>> > Aljoscha
>>> >
>>> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[hidden email]>
>>> > wrote:
>>> >>
>>> >> Hi
>>> >>
>>> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
>>> >>
>>> >> I asked similar question before and applied this patch.
>>> >>
>>> >>
>>> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
>>> >> It looked work but still I have strange behavior.
>>> >>
>>> >> The code is:
>>> >>
>>> >> ----
>>> >>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> >>     val input =
>>> >>
>>> >>
>>> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>> >>       .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>> >>       .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
>>> >>       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>> >>       .fold(Set[String]()){(r,i) => { r + i}}
>>> >>       .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>> >>
>>> >>     input print
>>> >> ---
>>> >>
>>> >> This case, the base window is long, so I just expect cumulative
>>> >> distinct count of the value every 5 seconds.
>>> >>
>>> >> Appended 8 strings to the input file with 1 second interval.
>>> >>
>>> >> ---
>>> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
>>> >> 1; done
>>> >> Wed Mar 30 20:51:36 JST 2016
>>> >> Wed Mar 30 20:51:37 JST 2016
>>> >> Wed Mar 30 20:51:38 JST 2016
>>> >> Wed Mar 30 20:51:39 JST 2016
>>> >> Wed Mar 30 20:51:40 JST 2016
>>> >> Wed Mar 30 20:51:41 JST 2016
>>> >> Wed Mar 30 20:51:42 JST 2016
>>> >> Wed Mar 30 20:51:43 JST 2016
>>> >> ---
>>> >>
>>> >> But I only received 1 output event. I should receive one more event  5
>>> >> seconds later, but actually nothing.
>>> >>
>>> >> (2016-03-30 20:51:40.002,4)
>>> >>
>>> >> Later, if I put additional line to the file. I got these events.
>>> >>
>>> >> (2016-03-30 21:12:05.39,9)
>>> >> (2016-03-30 21:12:10.001,9)
>>> >>
>>> >> I slightly modified ContinuousProcessingTimeTrigger.java and added
>>> >> logging in onProcessingTime method. It looks like the method was
>>> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.
>>> >>
>>> >> ----
>>> >> 2016-03-30 20:51:40,002 INFO
>>> >>
>>> >>
>>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>> >>  - onProcessingTime called: 2016-03-30 20:51:40.002
>>> >> ...
>>> >> 2016-03-30 21:12:10,001 INFO
>>> >>
>>> >>
>>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>> >>  - onProcessingTime called: 2016-03-30 21:12:10.001
>>> >> ----
>>> >>
>>> >> Is this an expected behavior?
>>> >>
>>> >> Regards,
>>> >> Hironori