TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger

Hironori Ogibayashi
Hello,

I have a question about TumblingProcessingTimeWindow and
ContinuousProcessingTimeTrigger.

The code I tried is below. Output the distinct count of the words,
counts are printed every 5 seconds and window is reset every 1 minute.

---
    val input =
env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
      .fold(Set[String]()){(r,i) => { r + i}}
      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}

    input print
---

I wrote data to the input file with some interval.

---
echo "aaa" >> input.txt
echo "aaa" >> input.txt
sleep 10
echo "bbb" >> input.txt
sleep 60
echo "ccc" >> input.txt
---

The result I got was just 1 record. The expected output was 1 -> (10+
sec later) 2 -> (60+ sec later) 1 .
---
(2016-03-18 13:08:59.288,2)
---

Even after several minutes, I never got additional record. In my
understanding, with
ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
operator (fold, map) in the code above will be evaluated every 5
seconds.
Am I mis-understand something?

Regards,
Hironori
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger

Aljoscha Krettek
Hi,
I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer is not correctly set. You can try it with this fixed version, that I will also update in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930

One more thing, the ContinuousProcessingTimeTrigger will never remove the window. The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger will only ever fire for a window. This means that you will have a lot of windows hanging around in your state at some points and they will never be cleaned up. For now, if you require the behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time and also has the continuous triggering at earlier times.

Let us know if you need more information about this. Kostas Kloudas also recently looked into writing custom Triggers, so maybe he has some material he could give to you.

Cheers,
Aljoscha

> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <[hidden email]> wrote:
>
> Hello,
>
> I have a question about TumblingProcessingTimeWindow and
> ContinuousProcessingTimeTrigger.
>
> The code I tried is below. Output the distinct count of the words,
> counts are printed every 5 seconds and window is reset every 1 minute.
>
> ---
>    val input =
> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>      .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>      .fold(Set[String]()){(r,i) => { r + i}}
>      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>
>    input print
> ---
>
> I wrote data to the input file with some interval.
>
> ---
> echo "aaa" >> input.txt
> echo "aaa" >> input.txt
> sleep 10
> echo "bbb" >> input.txt
> sleep 60
> echo "ccc" >> input.txt
> ---
>
> The result I got was just 1 record. The expected output was 1 -> (10+
> sec later) 2 -> (60+ sec later) 1 .
> ---
> (2016-03-18 13:08:59.288,2)
> ---
>
> Even after several minutes, I never got additional record. In my
> understanding, with
> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
> operator (fold, map) in the code above will be evaluated every 5
> seconds.
> Am I mis-understand something?
>
> Regards,
> Hironori

Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger

Hironori Ogibayashi
Aljoscha,

Thank you for fixing the issue.
I built both Flink server and job with the code you provided, and it
worked as almost expected.
The output was below. I am wondering why the value emitted at
19:44:44.635 while I set
ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
problem for me.

---
(2016-03-22 19:44:35.002,1)
(2016-03-22 19:44:44.635,2)
(2016-03-22 19:44:45.001,2)
(2016-03-22 19:45:45.001,1)
---

And regarding the removal from the window, you mean the data remains
in the window even if
I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
I thought that ContinuousProcessingTimeTrigger works on top of
timeWindowAll and timeWindowAll
take care of purging data from the window.

---
.timeWindowAll(Time.of(60, TimeUnit.SECONDS))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
---

Regards,
Hironori

2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <[hidden email]>:

> Hi,
> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer is not correctly set. You can try it with this fixed version, that I will also update in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>
> One more thing, the ContinuousProcessingTimeTrigger will never remove the window. The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger will only ever fire for a window. This means that you will have a lot of windows hanging around in your state at some points and they will never be cleaned up. For now, if you require the behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time and also has the continuous triggering at earlier times.
>
> Let us know if you need more information about this. Kostas Kloudas also recently looked into writing custom Triggers, so maybe he has some material he could give to you.
>
> Cheers,
> Aljoscha
>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <[hidden email]> wrote:
>>
>> Hello,
>>
>> I have a question about TumblingProcessingTimeWindow and
>> ContinuousProcessingTimeTrigger.
>>
>> The code I tried is below. Output the distinct count of the words,
>> counts are printed every 5 seconds and window is reset every 1 minute.
>>
>> ---
>>    val input =
>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>      .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>      .fold(Set[String]()){(r,i) => { r + i}}
>>      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>
>>    input print
>> ---
>>
>> I wrote data to the input file with some interval.
>>
>> ---
>> echo "aaa" >> input.txt
>> echo "aaa" >> input.txt
>> sleep 10
>> echo "bbb" >> input.txt
>> sleep 60
>> echo "ccc" >> input.txt
>> ---
>>
>> The result I got was just 1 record. The expected output was 1 -> (10+
>> sec later) 2 -> (60+ sec later) 1 .
>> ---
>> (2016-03-18 13:08:59.288,2)
>> ---
>>
>> Even after several minutes, I never got additional record. In my
>> understanding, with
>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>> operator (fold, map) in the code above will be evaluated every 5
>> seconds.
>> Am I mis-understand something?
>>
>> Regards,
>> Hironori
>
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger

Aljoscha Krettek
Hi,
the output at 19:44:44.635 is indeed strange. Is this reproducible?

As for the removal of windows. That is a pitfall a lot of users have fallen into. The timeWindowAll() call just sets up a window assigner, so in your case the equivalent call would be:

     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
     .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— difference is here
     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
     .fold(Set[String]()){(r,i) => { r + i}}

The window assigners itself does not do any cleanup or triggering of window processing. It does, however, come with a default Trigger which is ProcessingTimeTrigger in case of TumblingProcessingTimeWindows. This trigger fill fire once at the end of a window and then also purge the window contents. By calling trigger() the default trigger is replaced and ContinuousProcessingTimeTrigger does not clean up (purge) windows.

This is something that seems to happen for a lot of people, I therefore started an initiative to try and improve windows/triggers: https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118A-403B-B766-6349083254AB@...%3e

I created an associated doc to keep track of my proposed changes: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing

What do you think?

Cheers,
Aljoscha

> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <[hidden email]> wrote:
>
> Aljoscha,
>
> Thank you for fixing the issue.
> I built both Flink server and job with the code you provided, and it
> worked as almost expected.
> The output was below. I am wondering why the value emitted at
> 19:44:44.635 while I set
> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
> problem for me.
>
> ---
> (2016-03-22 19:44:35.002,1)
> (2016-03-22 19:44:44.635,2)
> (2016-03-22 19:44:45.001,2)
> (2016-03-22 19:45:45.001,1)
> ---
>
> And regarding the removal from the window, you mean the data remains
> in the window even if
> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
> I thought that ContinuousProcessingTimeTrigger works on top of
> timeWindowAll and timeWindowAll
> take care of purging data from the window.
>
> ---
> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
> ---
>
> Regards,
> Hironori
>
> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>> Hi,
>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer is not correctly set. You can try it with this fixed version, that I will also update in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>>
>> One more thing, the ContinuousProcessingTimeTrigger will never remove the window. The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger will only ever fire for a window. This means that you will have a lot of windows hanging around in your state at some points and they will never be cleaned up. For now, if you require the behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time and also has the continuous triggering at earlier times.
>>
>> Let us know if you need more information about this. Kostas Kloudas also recently looked into writing custom Triggers, so maybe he has some material he could give to you.
>>
>> Cheers,
>> Aljoscha
>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <[hidden email]> wrote:
>>>
>>> Hello,
>>>
>>> I have a question about TumblingProcessingTimeWindow and
>>> ContinuousProcessingTimeTrigger.
>>>
>>> The code I tried is below. Output the distinct count of the words,
>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>>
>>> ---
>>>   val input =
>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>     .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>     .fold(Set[String]()){(r,i) => { r + i}}
>>>     .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>>
>>>   input print
>>> ---
>>>
>>> I wrote data to the input file with some interval.
>>>
>>> ---
>>> echo "aaa" >> input.txt
>>> echo "aaa" >> input.txt
>>> sleep 10
>>> echo "bbb" >> input.txt
>>> sleep 60
>>> echo "ccc" >> input.txt
>>> ---
>>>
>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>> sec later) 2 -> (60+ sec later) 1 .
>>> ---
>>> (2016-03-18 13:08:59.288,2)
>>> ---
>>>
>>> Even after several minutes, I never got additional record. In my
>>> understanding, with
>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>> operator (fold, map) in the code above will be evaluated every 5
>>> seconds.
>>> Am I mis-understand something?
>>>
>>> Regards,
>>> Hironori
>>

Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger

Hironori Ogibayashi
Aljoscha,

Yes, it's reproducible as long as I tried. Here is the code and
procedure: https://gist.github.com/ogibayashi/402153bcd79138c35d6a

Thank you for your explanation about the removal of windows. I didn't
know that calling .trigger() will replace default
window trigger.
I have read your document, I personally think the current .window,
.trigger, .evictor APIs are flexible enough if they
work as expected. ( calling .trigger does not replace the
windowAssigner's default trigger/evictor, but just add additional
trigger). Although I am new to Flink and may not understand the
problem correctly.
As for "Testability and Test Coverage" section, I think it's good idea
to use internal clock instead of System clock.
It will make it easy to test my streaming jobs.

Regards,
Hironori

2016-03-23 18:24 GMT+09:00 Aljoscha Krettek <[hidden email]>:

> Hi,
> the output at 19:44:44.635 is indeed strange. Is this reproducible?
>
> As for the removal of windows. That is a pitfall a lot of users have fallen into. The timeWindowAll() call just sets up a window assigner, so in your case the equivalent call would be:
>
>      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— difference is here
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>      .fold(Set[String]()){(r,i) => { r + i}}
>
> The window assigners itself does not do any cleanup or triggering of window processing. It does, however, come with a default Trigger which is ProcessingTimeTrigger in case of TumblingProcessingTimeWindows. This trigger fill fire once at the end of a window and then also purge the window contents. By calling trigger() the default trigger is replaced and ContinuousProcessingTimeTrigger does not clean up (purge) windows.
>
> This is something that seems to happen for a lot of people, I therefore started an initiative to try and improve windows/triggers: https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118A-403B-B766-6349083254AB@...%3e
>
> I created an associated doc to keep track of my proposed changes: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
>
> What do you think?
>
> Cheers,
> Aljoscha
>> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <[hidden email]> wrote:
>>
>> Aljoscha,
>>
>> Thank you for fixing the issue.
>> I built both Flink server and job with the code you provided, and it
>> worked as almost expected.
>> The output was below. I am wondering why the value emitted at
>> 19:44:44.635 while I set
>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
>> problem for me.
>>
>> ---
>> (2016-03-22 19:44:35.002,1)
>> (2016-03-22 19:44:44.635,2)
>> (2016-03-22 19:44:45.001,2)
>> (2016-03-22 19:45:45.001,1)
>> ---
>>
>> And regarding the removal from the window, you mean the data remains
>> in the window even if
>> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
>> I thought that ContinuousProcessingTimeTrigger works on top of
>> timeWindowAll and timeWindowAll
>> take care of purging data from the window.
>>
>> ---
>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>> ---
>>
>> Regards,
>> Hironori
>>
>> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>>> Hi,
>>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer is not correctly set. You can try it with this fixed version, that I will also update in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>>>
>>> One more thing, the ContinuousProcessingTimeTrigger will never remove the window. The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger will only ever fire for a window. This means that you will have a lot of windows hanging around in your state at some points and they will never be cleaned up. For now, if you require the behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time and also has the continuous triggering at earlier times.
>>>
>>> Let us know if you need more information about this. Kostas Kloudas also recently looked into writing custom Triggers, so maybe he has some material he could give to you.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <[hidden email]> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I have a question about TumblingProcessingTimeWindow and
>>>> ContinuousProcessingTimeTrigger.
>>>>
>>>> The code I tried is below. Output the distinct count of the words,
>>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>>>
>>>> ---
>>>>   val input =
>>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>>     .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>>     .fold(Set[String]()){(r,i) => { r + i}}
>>>>     .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>>>
>>>>   input print
>>>> ---
>>>>
>>>> I wrote data to the input file with some interval.
>>>>
>>>> ---
>>>> echo "aaa" >> input.txt
>>>> echo "aaa" >> input.txt
>>>> sleep 10
>>>> echo "bbb" >> input.txt
>>>> sleep 60
>>>> echo "ccc" >> input.txt
>>>> ---
>>>>
>>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>>> sec later) 2 -> (60+ sec later) 1 .
>>>> ---
>>>> (2016-03-18 13:08:59.288,2)
>>>> ---
>>>>
>>>> Even after several minutes, I never got additional record. In my
>>>> understanding, with
>>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>>> operator (fold, map) in the code above will be evaluated every 5
>>>> seconds.
>>>> Am I mis-understand something?
>>>>
>>>> Regards,
>>>> Hironori
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: TumblingProcessingTimeWindow and ContinuousProcessingTimeTrigger

Kostas Kloudas
Hello Aljoscha and Hironori,

Nice initiative! I totally agree with the proposals in the document.
I also left some comments and I will soon start working on some
of the issues there.

Kostas

> On Mar 25, 2016, at 12:53 PM, Hironori Ogibayashi <[hidden email]> wrote:
>
> Aljoscha,
>
> Yes, it's reproducible as long as I tried. Here is the code and
> procedure: https://gist.github.com/ogibayashi/402153bcd79138c35d6a
>
> Thank you for your explanation about the removal of windows. I didn't
> know that calling .trigger() will replace default
> window trigger.
> I have read your document, I personally think the current .window,
> .trigger, .evictor APIs are flexible enough if they
> work as expected. ( calling .trigger does not replace the
> windowAssigner's default trigger/evictor, but just add additional
> trigger). Although I am new to Flink and may not understand the
> problem correctly.
> As for "Testability and Test Coverage" section, I think it's good idea
> to use internal clock instead of System clock.
> It will make it easy to test my streaming jobs.
>
> Regards,
> Hironori
>
> 2016-03-23 18:24 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>> Hi,
>> the output at 19:44:44.635 is indeed strange. Is this reproducible?
>>
>> As for the removal of windows. That is a pitfall a lot of users have fallen into. The timeWindowAll() call just sets up a window assigner, so in your case the equivalent call would be:
>>
>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>     .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— difference is here
>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>     .fold(Set[String]()){(r,i) => { r + i}}
>>
>> The window assigners itself does not do any cleanup or triggering of window processing. It does, however, come with a default Trigger which is ProcessingTimeTrigger in case of TumblingProcessingTimeWindows. This trigger fill fire once at the end of a window and then also purge the window contents. By calling trigger() the default trigger is replaced and ContinuousProcessingTimeTrigger does not clean up (purge) windows.
>>
>> This is something that seems to happen for a lot of people, I therefore started an initiative to try and improve windows/triggers: https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118A-403B-B766-6349083254AB@...%3e
>>
>> I created an associated doc to keep track of my proposed changes: https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
>>
>> What do you think?
>>
>> Cheers,
>> Aljoscha
>>> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <[hidden email]> wrote:
>>>
>>> Aljoscha,
>>>
>>> Thank you for fixing the issue.
>>> I built both Flink server and job with the code you provided, and it
>>> worked as almost expected.
>>> The output was below. I am wondering why the value emitted at
>>> 19:44:44.635 while I set
>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
>>> problem for me.
>>>
>>> ---
>>> (2016-03-22 19:44:35.002,1)
>>> (2016-03-22 19:44:44.635,2)
>>> (2016-03-22 19:44:45.001,2)
>>> (2016-03-22 19:45:45.001,1)
>>> ---
>>>
>>> And regarding the removal from the window, you mean the data remains
>>> in the window even if
>>> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
>>> I thought that ContinuousProcessingTimeTrigger works on top of
>>> timeWindowAll and timeWindowAll
>>> take care of purging data from the window.
>>>
>>> ---
>>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>> ---
>>>
>>> Regards,
>>> Hironori
>>>
>>> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <[hidden email]>:
>>>> Hi,
>>>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The timer is not correctly set. You can try it with this fixed version, that I will also update in the Flink code: https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>>>>
>>>> One more thing, the ContinuousProcessingTimeTrigger will never remove the window. The default EventTimeTrigger will fire a window and purge the contents while the ContinuousProcessingTimeTrigger will only ever fire for a window. This means that you will have a lot of windows hanging around in your state at some points and they will never be cleaned up. For now, if you require the behavior of continuously firing on a TimeWindow I would suggest to write a custom Trigger based on EventTimeTrigger (or ProcessingTimeTrigger) that does the firing and purging on time and also has the continuous triggering at earlier times.
>>>>
>>>> Let us know if you need more information about this. Kostas Kloudas also recently looked into writing custom Triggers, so maybe he has some material he could give to you.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I have a question about TumblingProcessingTimeWindow and
>>>>> ContinuousProcessingTimeTrigger.
>>>>>
>>>>> The code I tried is below. Output the distinct count of the words,
>>>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>>>>
>>>>> ---
>>>>>  val input =
>>>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>>>    .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>>>    .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>>>    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>>>    .fold(Set[String]()){(r,i) => { r + i}}
>>>>>    .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>>>>
>>>>>  input print
>>>>> ---
>>>>>
>>>>> I wrote data to the input file with some interval.
>>>>>
>>>>> ---
>>>>> echo "aaa" >> input.txt
>>>>> echo "aaa" >> input.txt
>>>>> sleep 10
>>>>> echo "bbb" >> input.txt
>>>>> sleep 60
>>>>> echo "ccc" >> input.txt
>>>>> ---
>>>>>
>>>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>>>> sec later) 2 -> (60+ sec later) 1 .
>>>>> ---
>>>>> (2016-03-18 13:08:59.288,2)
>>>>> ---
>>>>>
>>>>> Even after several minutes, I never got additional record. In my
>>>>> understanding, with
>>>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>>>> operator (fold, map) in the code above will be evaluated every 5
>>>>> seconds.
>>>>> Am I mis-understand something?
>>>>>
>>>>> Regards,
>>>>> Hironori
>>>>
>>