Slide Window Compute Optimization

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Slide Window Compute Optimization

YennieChen88
Hi,
    I want to use slide windows of 1 hour window size and 1 second step
size. I found that once a element arrives, it will be processed in 3600
windows serially through one thread. It takes serveral seconds to finish one
element processing,much more than my expection. Do I have any way to
optimizate it?
    Thank you very much for your reply.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Slide Window Compute Optimization

Kostas Kloudas
Hi,

You are correct that with sliding windows you will have 3600 “open windows” at any point.
Could you describe a bit more what you want to do?

If you simply want to have an update of something like a counter every second, then you can 
implement your own logic with a ProcessFunction that allows to handle state and timers in a 
custom way (see [1]).

Hope this helps,
Kostas

 

On Jul 5, 2018, at 12:12 PM, YennieChen88 <[hidden email]> wrote:

Hi,
   I want to use slide windows of 1 hour window size and 1 second step
size. I found that once a element arrives, it will be processed in 3600
windows serially through one thread. It takes serveral seconds to finish one
element processing,much more than my expection. Do I have any way to
optimizate it?
   Thank you very much for your reply.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Slide Window Compute Optimization

Rong Rong
Hi Yennie,

AFAIK, the sliding window will in fact duplicate elements into multiple different streams. There's a discussion thread regarding this [1]. 
We are looking into some performance improvement, can you provide some more info regarding your use case?

--
Rong


On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas <[hidden email]> wrote:
Hi,

You are correct that with sliding windows you will have 3600 “open windows” at any point.
Could you describe a bit more what you want to do?

If you simply want to have an update of something like a counter every second, then you can 
implement your own logic with a ProcessFunction that allows to handle state and timers in a 
custom way (see [1]).

Hope this helps,
Kostas

 

On Jul 5, 2018, at 12:12 PM, YennieChen88 <[hidden email]> wrote:

Hi,
   I want to use slide windows of 1 hour window size and 1 second step
size. I found that once a element arrives, it will be processed in 3600
windows serially through one thread. It takes serveral seconds to finish one
element processing,much more than my expection. Do I have any way to
optimizate it?
   Thank you very much for your reply.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Slide Window Compute Optimization

YennieChen88
Hi Kostas and Rong,
    Thank you for your reply.
    As both of you ask for more info about my use case, I now reply in
unison.
    My case is used for counting the number of successful login and failures
within one hour, keyBy other login related attributes (e.g. ip, device,
login type ...). According to the count result of the previous hour, to
judge whether the next login is compliant.
    We have a high requirement for the flink compute time, to reduce the
impact on user login. From receiving source to sinking results into
database, only about 10ms time is acceptable. Base on this, we expect the
compute result as accurate as possible. The best case without error is the
latest sink time after 1-hour compute exactly the same as the user login
time which need judge compliance. Is that means the smaller the step size of
slide window, the more accurate the results? But Now it seems that the
smaller step size of slide window,the longer time need to compute. Because
once a element arrives, it will be processed in every window (number of
windows = window size/step size)serially through one thread.
   
Rong Rong wrote

> Hi Yennie,
>
> AFAIK, the sliding window will in fact duplicate elements into multiple
> different streams. There's a discussion thread regarding this [1].
> We are looking into some performance improvement, can you provide some
> more
> info regarding your use case?
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-7001
>
> On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas &lt;

> k.kloudas@

> &gt;
> wrote:
>
>> Hi,
>>
>> You are correct that with sliding windows you will have 3600 “open
>> windows” at any point.
>> Could you describe a bit more what you want to do?
>>
>> If you simply want to have an update of something like a counter every
>> second, then you can
>> implement your own logic with a ProcessFunction that allows to handle
>> state and timers in a
>> custom way (see [1]).
>>
>> Hope this helps,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html
>>
>>
>> On Jul 5, 2018, at 12:12 PM, YennieChen88 &lt;

> chenyanying3@

> &gt; wrote:
>>
>> Hi,
>>    I want to use slide windows of 1 hour window size and 1 second step
>> size. I found that once a element arrives, it will be processed in 3600
>> windows serially through one thread. It takes serveral seconds to finish
>> one
>> element processing,much more than my expection. Do I have any way to
>> optimizate it?
>>    Thank you very much for your reply.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Slide Window Compute Optimization

Fabian Hueske-2
Hi Yennie,

You might want to have a look at the OVER windows of Flink's Table API or SQL [1].

An OVER window computes an aggregate (such as a count) for each incoming record over a range of previous events.
For example the query:

SELECT ip, successful, COUNT(*) OVER (PARTITION BY ip, successful ORDER BY loginTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
  FROM logins;

computes for each login attempt the number of login attempts of the previous hour.

There is no corresponding built-in operator in the DataStream API but SQL and Table API queries can be very easily integrated with DataStream programs [2].

Best, Fabian


2018-07-06 8:01 GMT+02:00 YennieChen88 <[hidden email]>:
Hi Kostas and Rong,
    Thank you for your reply.
    As both of you ask for more info about my use case, I now reply in
unison.
    My case is used for counting the number of successful login and failures
within one hour, keyBy other login related attributes (e.g. ip, device,
login type ...). According to the count result of the previous hour, to
judge whether the next login is compliant.
    We have a high requirement for the flink compute time, to reduce the
impact on user login. From receiving source to sinking results into
database, only about 10ms time is acceptable. Base on this, we expect the
compute result as accurate as possible. The best case without error is the
latest sink time after 1-hour compute exactly the same as the user login
time which need judge compliance. Is that means the smaller the step size of
slide window, the more accurate the results? But Now it seems that the
smaller step size of slide window,the longer time need to compute. Because
once a element arrives, it will be processed in every window (number of
windows = window size/step size)serially through one thread.

Rong Rong wrote
> Hi Yennie,
>
> AFAIK, the sliding window will in fact duplicate elements into multiple
> different streams. There's a discussion thread regarding this [1].
> We are looking into some performance improvement, can you provide some
> more
> info regarding your use case?
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-7001
>
> On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas &lt;

> k.kloudas@

> &gt;
> wrote:
>
>> Hi,
>>
>> You are correct that with sliding windows you will have 3600 “open
>> windows” at any point.
>> Could you describe a bit more what you want to do?
>>
>> If you simply want to have an update of something like a counter every
>> second, then you can
>> implement your own logic with a ProcessFunction that allows to handle
>> state and timers in a
>> custom way (see [1]).
>>
>> Hope this helps,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html
>>
>>
>> On Jul 5, 2018, at 12:12 PM, YennieChen88 &lt;

> chenyanying3@

> &gt; wrote:
>>
>> Hi,
>>    I want to use slide windows of 1 hour window size and 1 second step
>> size. I found that once a element arrives, it will be processed in 3600
>> windows serially through one thread. It takes serveral seconds to finish
>> one
>> element processing,much more than my expection. Do I have any way to
>> optimizate it?
>>    Thank you very much for your reply.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Slide Window Compute Optimization

Rong Rong
+1. Yes your use case would probably fit best in the OVER aggregate use case. 

I actually created for myself a complimentary note for some of the complex aggregate components on top of Flink SQL/Table API official doc[1]. If this could help you better understanding how the OVER aggregate method could fit into your use case. Let me know if it is helpful :-)

@Fabian, if possible, please share some comments on the note when you have time. :-)

Thanks,
Rong

On Fri, Jul 6, 2018 at 2:30 AM Fabian Hueske <[hidden email]> wrote:
Hi Yennie,

You might want to have a look at the OVER windows of Flink's Table API or SQL [1].

An OVER window computes an aggregate (such as a count) for each incoming record over a range of previous events.
For example the query:

SELECT ip, successful, COUNT(*) OVER (PARTITION BY ip, successful ORDER BY loginTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
  FROM logins;

computes for each login attempt the number of login attempts of the previous hour.

There is no corresponding built-in operator in the DataStream API but SQL and Table API queries can be very easily integrated with DataStream programs [2].

Best, Fabian


2018-07-06 8:01 GMT+02:00 YennieChen88 <[hidden email]>:
Hi Kostas and Rong,
    Thank you for your reply.
    As both of you ask for more info about my use case, I now reply in
unison.
    My case is used for counting the number of successful login and failures
within one hour, keyBy other login related attributes (e.g. ip, device,
login type ...). According to the count result of the previous hour, to
judge whether the next login is compliant.
    We have a high requirement for the flink compute time, to reduce the
impact on user login. From receiving source to sinking results into
database, only about 10ms time is acceptable. Base on this, we expect the
compute result as accurate as possible. The best case without error is the
latest sink time after 1-hour compute exactly the same as the user login
time which need judge compliance. Is that means the smaller the step size of
slide window, the more accurate the results? But Now it seems that the
smaller step size of slide window,the longer time need to compute. Because
once a element arrives, it will be processed in every window (number of
windows = window size/step size)serially through one thread.

Rong Rong wrote
> Hi Yennie,
>
> AFAIK, the sliding window will in fact duplicate elements into multiple
> different streams. There's a discussion thread regarding this [1].
> We are looking into some performance improvement, can you provide some
> more
> info regarding your use case?
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-7001
>
> On Thu, Jul 5, 2018 at 3:30 AM Kostas Kloudas &lt;

> k.kloudas@

> &gt;
> wrote:
>
>> Hi,
>>
>> You are correct that with sliding windows you will have 3600 “open
>> windows” at any point.
>> Could you describe a bit more what you want to do?
>>
>> If you simply want to have an update of something like a counter every
>> second, then you can
>> implement your own logic with a ProcessFunction that allows to handle
>> state and timers in a
>> custom way (see [1]).
>>
>> Hope this helps,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html
>>
>>
>> On Jul 5, 2018, at 12:12 PM, YennieChen88 &lt;

> chenyanying3@

> &gt; wrote:
>>
>> Hi,
>>    I want to use slide windows of 1 hour window size and 1 second step
>> size. I found that once a element arrives, it will be processed in 3600
>> windows serially through one thread. It takes serveral seconds to finish
>> one
>> element processing,much more than my expection. Do I have any way to
>> optimizate it?
>>    Thank you very much for your reply.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/