Behavior of SlidingProessingTimeWindow with CountTrigger

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Behavior of SlidingProessingTimeWindow with CountTrigger

vishnuviswanath

Hi All,


I have the below code


val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost",4444)

val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .sum(1)

counts.print()
sev.execute()

I am sending messages to the port 4444 using nc -lk 4444
This is my sample input

a
a
a
a
a
b
b
b
b
b
c
c
c
c
c
d
d
d
d
d
e
e
e
e
e

I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing.
Output:

1> (a,5)
1> (a,5)
1> (b,5)
2> (c,5)
2> (c,5)
1> (d,5)
1> (e,5)
1> (e,5)

As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure.
Can someone explain me what is going on?


Thanks and Regards,
Vishnu Viswanath
www.vishnuviswanath.com

Reply | Threaded
Open this post in threaded view
|

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

Aljoscha Krettek
Hi,
I created a visualization to help explain the situation: http://s21.postimg.org/dofhcw52f/window_example.png
The SlidingProcessingTimeWindows assigner assigns elements to windows based on the current processing time. The CountTrigger only fires if a window contains 5 elements (or more). In your test the windows for a, c and e fell into case b because you probably entered the letters very fast. For elements  b and d we have case a The elements were far enough apart or you happened to enter them right on a window boundary such that only one window contains all of them. The other windows don’t contain enough elements to reach 5. In my drawing window 1 contains 5 elements while window 2 only contains 3 of those elements.

I hope this helps.

Cheers,
Aljoscha
On 12 Mar 2016, at 19:19, Vishnu Viswanath <[hidden email]> wrote:

Hi All,


I have the below code


val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost",4444)

val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .sum(1)

counts.print()
sev.execute()

I am sending messages to the port 4444 using nc -lk 4444
This is my sample input

a
a
a
a
a
b
b
b
b
b
c
c
c
c
c
d
d
d
d
d
e
e
e
e
e

I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing.
Output:

1> (a,5)
1> (a,5)
1> (b,5)
2> (c,5)
2> (c,5)
1> (d,5)
1> (e,5)
1> (e,5)

As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure.
Can someone explain me what is going on?


Thanks and Regards,
Vishnu Viswanath
www.vishnuviswanath.com


Reply | Threaded
Open this post in threaded view
|

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

vishnuviswanath

Hi Aijoscha,

Wow, great illustration.

That was very clear explanation. Yes, I did enter the elements fast for case b and I was seeing more of case As.
Also, sometimes I have seen a window getting triggered when I enter 1 or 2 elements, I believe that is expansion of case A, w.r.t to window 2.

Also can you explain me the case when using Evictor.
e.g.,


val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .evictor(CountEvictor.of(3))
  .sum(1).setParallelism(4);

counts.print()
sev.execute()

for the input


a

a

a

a

a

b

b

b

b

b

I got the output as


1> (a,3)

1> (b,3)

2> (b,3)

My assumption was that, when the Trigger is triggered, the processing will be done on the entire items in the window,

and then 3 items will be evicted from the window, which can also be part of the next processing of that window. But

here it looks like  the sum is calculated only on the items that were evicted from the window.

Could you please explain what is going on here.


Thanks and Regards,
Vishnu Viswanath,

On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I created a visualization to help explain the situation: http://s21.postimg.org/dofhcw52f/window_example.png
The SlidingProcessingTimeWindows assigner assigns elements to windows based on the current processing time. The CountTrigger only fires if a window contains 5 elements (or more). In your test the windows for a, c and e fell into case b because you probably entered the letters very fast. For elements  b and d we have case a The elements were far enough apart or you happened to enter them right on a window boundary such that only one window contains all of them. The other windows don’t contain enough elements to reach 5. In my drawing window 1 contains 5 elements while window 2 only contains 3 of those elements.

I hope this helps.

Cheers,
Aljoscha
On 12 Mar 2016, at 19:19, Vishnu Viswanath <[hidden email]> wrote:

Hi All,


I have the below code


val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost",4444)

val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .sum(1)

counts.print()
sev.execute()

I am sending messages to the port 4444 using nc -lk 4444
This is my sample input

a
a
a
a
a
b
b
b
b
b
c
c
c
c
c
d
d
d
d
d
e
e
e
e
e

I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing.
Output:

1> (a,5)
1> (a,5)
1> (b,5)
2> (c,5)
2> (c,5)
1> (d,5)
1> (e,5)
1> (e,5)

As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure.
Can someone explain me what is going on?


Thanks and Regards,
Vishnu Viswanath
www.vishnuviswanath.com



Reply | Threaded
Open this post in threaded view
|

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

Aljoscha Krettek
Hi,
sure, the evictors are a bit confusing (especially the fact that they are called evictors). They should more correctly called “Keepers”. The process is the following:

1. Trigger Fires
2. Evictor decides what elements to keep, so a CountEvictor.of(3) says, keep only three elements, all others are evicted
3. Elements that remain after evictor are used for processing

We mostly have Evictors for legacy reasons nowadays since the original window implementation was based on ideas in IBM InfoSphere streams. See this part of their documentation for some explanation: https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html

- aljoscha

> On 14 Mar 2016, at 17:04, Vishnu Viswanath <[hidden email]> wrote:
>
> Hi Aijoscha,
>
> Wow, great illustration.
>
> That was very clear explanation. Yes, I did enter the elements fast for case b and I was seeing more of case As.
> Also, sometimes I have seen a window getting triggered when I enter 1 or 2 elements, I believe that is expansion of case A, w.r.t to window 2.
>
> Also can you explain me the case when using Evictor.
> e.g.,
>
>
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .evictor(CountEvictor.of(3))
>   .sum(1).setParallelism(4);
>
> counts.print()
> sev.execute()
>
> for the input
>
>
> a
>
> a
>
> a
>
> a
>
> a
>
> b
>
> b
>
> b
>
> b
>
> b
>
> I got the output as
>
>
> 1> (a,3)
>
> 1> (b,3)
>
> 2> (b,3)
>
> My assumption was that, when the Trigger is triggered, the processing will be done on the entire items in the window,
>
> and then 3 items will be evicted from the window, which can also be part of the next processing of that window. But
>
> here it looks like  the sum is calculated only on the items that were evicted from the window.
>
> Could you please explain what is going on here.
>
>
>
> Thanks and Regards,
> Vishnu Viswanath,
> www.vishnuviswanath.com
>
> On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <[hidden email]> wrote:
> Hi,
> I created a visualization to help explain the situation: http://s21.postimg.org/dofhcw52f/window_example.png
> <window example.png>
> The SlidingProcessingTimeWindows assigner assigns elements to windows based on the current processing time. The CountTrigger only fires if a window contains 5 elements (or more). In your test the windows for a, c and e fell into case b because you probably entered the letters very fast. For elements  b and d we have case a The elements were far enough apart or you happened to enter them right on a window boundary such that only one window contains all of them. The other windows don’t contain enough elements to reach 5. In my drawing window 1 contains 5 elements while window 2 only contains 3 of those elements.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>> On 12 Mar 2016, at 19:19, Vishnu Viswanath <[hidden email]> wrote:
>>
>> Hi All,
>>
>>
>> I have the below code
>>
>>
>> val sev = StreamExecutionEnvironment.getExecutionEnvironment
>> val socTextStream = sev.socketTextStream("localhost",4444)
>>
>> val counts = socTextStream.flatMap{_.split("\\s")}
>>   .map { (_, 1) }
>>   .keyBy(0)
>>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>>   .trigger(CountTrigger.of(5))
>>   .sum(1)
>>
>> counts.print()
>> sev.execute()
>>
>> I am sending messages to the port 4444 using nc -lk 4444
>> This is my sample input
>>
>> a
>> a
>> a
>> a
>> a
>> b
>> b
>> b
>> b
>> b
>> c
>> c
>> c
>> c
>> c
>> d
>> d
>> d
>> d
>> d
>> e
>> e
>> e
>> e
>> e
>>
>> I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing.
>> Output:
>>
>> 1> (a,5)
>> 1> (a,5)
>> 1> (b,5)
>> 2> (c,5)
>> 2> (c,5)
>> 1> (d,5)
>> 1> (e,5)
>> 1> (e,5)
>>
>> As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure.
>> Can someone explain me what is going on?
>>
>>
>> Thanks and Regards,
>> Vishnu Viswanath
>> www.vishnuviswanath.com
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

vishnuviswanath

Hi Aljoscha,

Thank you for the explanation and the link on IBM infosphere. That explains whey I am seeing (a,3) and (b,3) in my example.

Yes, the name Evictor is confusing. 

Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com

On Mon, Mar 14, 2016 at 11:24 AM, Aljoscha Krettek <[hidden email]> wrote:

Hi,
sure, the evictors are a bit confusing (especially the fact that they are called evictors). They should more correctly called “Keepers”. The process is the following:

1. Trigger Fires
2. Evictor decides what elements to keep, so a CountEvictor.of(3) says, keep only three elements, all others are evicted
3. Elements that remain after evictor are used for processing

We mostly have Evictors for legacy reasons nowadays since the original window implementation was based on ideas in IBM InfoSphere streams. See this part of their documentation for some explanation: https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html

- aljoscha
> On 14 Mar 2016, at 17:04, Vishnu Viswanath <[hidden email]> wrote:
>
> Hi Aijoscha,
>
> Wow, great illustration.
>
> That was very clear explanation. Yes, I did enter the elements fast for case b and I was seeing more of case As.
> Also, sometimes I have seen a window getting triggered when I enter 1 or 2 elements, I believe that is expansion of case A, w.r.t to window 2.
>
> Also can you explain me the case when using Evictor.
> e.g.,
>
>
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .evictor(CountEvictor.of(3))
>   .sum(1).setParallelism(4);
>
> counts.print()
> sev.execute()
>
> for the input
>
>
> a
>
> a
>
> a
>
> a
>
> a
>
> b
>
> b
>
> b
>
> b
>
> b
>
> I got the output as
>
>
> 1> (a,3)
>
> 1> (b,3)
>
> 2> (b,3)
>
> My assumption was that, when the Trigger is triggered, the processing will be done on the entire items in the window,
>
> and then 3 items will be evicted from the window, which can also be part of the next processing of that window. But
>
> here it looks like  the sum is calculated only on the items that were evicted from the window.
>
> Could you please explain what is going on here.
>
>
>
> Thanks and Regards,
> Vishnu Viswanath,
> www.vishnuviswanath.com
>
> On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <[hidden email]> wrote:
> Hi,
> I created a visualization to help explain the situation: http://s21.postimg.org/dofhcw52f/window_example.png
> <window example.png>
> The SlidingProcessingTimeWindows assigner assigns elements to windows based on the current processing time. The CountTrigger only fires if a window contains 5 elements (or more). In your test the windows for a, c and e fell into case b because you probably entered the letters very fast. For elements  b and d we have case a The elements were far enough apart or you happened to enter them right on a window boundary such that only one window contains all of them. The other windows don’t contain enough elements to reach 5. In my drawing window 1 contains 5 elements while window 2 only contains 3 of those elements.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>> On 12 Mar 2016, at 19:19, Vishnu Viswanath <[hidden email]> wrote:
>>
>> Hi All,
>>
>>
>> I have the below code
>>
>>
>> val sev = StreamExecutionEnvironment.getExecutionEnvironment
>> val socTextStream = sev.socketTextStream("localhost",4444)
>>
>> val counts = socTextStream.flatMap{_.split("\\s")}
>>   .map { (_, 1) }
>>   .keyBy(0)
>>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>>   .trigger(CountTrigger.of(5))
>>   .sum(1)
>>
>> counts.print()
>> sev.execute()
>>
>> I am sending messages to the port 4444 using nc -lk 4444
>> This is my sample input
>>
>> a
>> a
>> a
>> a
>> a
>> b
>> b
>> b
>> b
>> b
>> c
>> c
>> c
>> c
>> c
>> d
>> d
>> d
>> d
>> d
>> e
>> e
>> e
>> e
>> e
>>
>> I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing.
>> Output:
>>
>> 1> (a,5)
>> 1> (a,5)
>> 1> (b,5)
>> 2> (c,5)
>> 2> (c,5)
>> 1> (d,5)
>> 1> (e,5)
>> 1> (e,5)
>>
>> As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure.
>> Can someone explain me what is going on?
>>
>>
>> Thanks and Regards,
>> Vishnu Viswanath
>> www.vishnuviswanath.com
>>
>
>