events eviction

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

events eviction

Radu Tudoran

Hello,

 

I am looking over the mechanisms of evicting events in Flink. I saw that either using a default evictor or building a custom one the logic is that the evictor will provide the number of events to be discarded.

Could you please provide me with some additional pointers regarding the mechanism in Flink where this actually happens:

-          The class that implements this functionality of discarding the events? (my initial expectations that this happens in the window class turn out to be wrong). I checked and found the “EvictingNonKeyedWindowOperator” – is this the right place to look indeed?

-          If yes, would it be possible to create a customizable class like this one and somehow pass it to the framework? I would be curious if there is an option  other than modifying the core classes and recompiling the framework?

 

On a slightly parallel topic - is there some way of creating a state in the evictor that will be check pointed and restore in case of failure.  I would be interested if something like an operator state is possible in the evictor.

 

Regards,

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

Reply | Threaded
Open this post in threaded view
|

Re: events eviction

Aljoscha Krettek
Hi,
you are right, the logic is in EvictingNonKeyedWindowOperator.emitWindow() for non-parallel (non-keyed) windows and in EvictingWindow.processTriggerResult() in the case of keyed windows.

You are also right about the contract of the Evictor, it returns the number of elements to be evicted from the beginning. This also means that eviction does not consider any timestamps in the elements and is therefore quite arbitrary. The places in the code I mentioned above simply get the value from the Evictor and evict that many elements from the internal buffer/state.

Right now it is not possible to replace the window operator that is used by flink. What you can do is copy the window operator code and use it manually using DataStream.transform().

About the evictor state. I’m afraid this is not possible right now. It was a conscious decision to make the Evictor stateless to make it easier for the system to handle. I would also strongly advise against using Evictors if at all possible. They make it impossible to incrementally aggregate window results (for example with a reduce function). This can have a huge performance/memory footprint impact. In your case, what are you using them for?

I hope this helps somehow, but let us know if you need further explanations.

Cheers,
Aljoscha

> On 15 Feb 2016, at 11:09, Radu Tudoran <[hidden email]> wrote:
>
> Hello,
>  
> I am looking over the mechanisms of evicting events in Flink. I saw that either using a default evictor or building a custom one the logic is that the evictor will provide the number of events to be discarded.
> Could you please provide me with some additional pointers regarding the mechanism in Flink where this actually happens:
> -          The class that implements this functionality of discarding the events? (my initial expectations that this happens in the window class turn out to be wrong). I checked and found the “EvictingNonKeyedWindowOperator” – is this the right place to look indeed?
> -          If yes, would it be possible to create a customizable class like this one and somehow pass it to the framework? I would be curious if there is an option  other than modifying the core classes and recompiling the framework?
>  
> On a slightly parallel topic - is there some way of creating a state in the evictor that will be check pointed and restore in case of failure.  I would be interested if something like an operator state is possible in the evictor.
>  
> Regards,
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

Reply | Threaded
Open this post in threaded view
|

RE: events eviction

Radu Tudoran
Hi,

Thanks Aljoscha for the details!

The warning about performance and evictors is useful, but I am not sure how it can be put in practice always. Take for example a GlobalWindow that you would use to aggregate data from multiple partitions. A GlobalWindow does not come with a trigger - would it have than a default evictor? Even if it has some, you still need to control the eviction of the events. Secondly, assuming that you would need to aggregate the data from 2 partitions and evict something only when you have one item from each partition. You would need a sort of state for this. And then to ensure resiliency, the state should be recoverable if a crash happens. Could you approach this without an evictor state?


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Monday, February 15, 2016 11:58 AM
To: [hidden email]
Subject: Re: events eviction

Hi,
you are right, the logic is in EvictingNonKeyedWindowOperator.emitWindow() for non-parallel (non-keyed) windows and in EvictingWindow.processTriggerResult() in the case of keyed windows.

You are also right about the contract of the Evictor, it returns the number of elements to be evicted from the beginning. This also means that eviction does not consider any timestamps in the elements and is therefore quite arbitrary. The places in the code I mentioned above simply get the value from the Evictor and evict that many elements from the internal buffer/state.

Right now it is not possible to replace the window operator that is used by flink. What you can do is copy the window operator code and use it manually using DataStream.transform().

About the evictor state. I’m afraid this is not possible right now. It was a conscious decision to make the Evictor stateless to make it easier for the system to handle. I would also strongly advise against using Evictors if at all possible. They make it impossible to incrementally aggregate window results (for example with a reduce function). This can have a huge performance/memory footprint impact. In your case, what are you using them for?

I hope this helps somehow, but let us know if you need further explanations.

Cheers,
Aljoscha

> On 15 Feb 2016, at 11:09, Radu Tudoran <[hidden email]> wrote:
>
> Hello,
>  
> I am looking over the mechanisms of evicting events in Flink. I saw that either using a default evictor or building a custom one the logic is that the evictor will provide the number of events to be discarded.
> Could you please provide me with some additional pointers regarding the mechanism in Flink where this actually happens:
> -          The class that implements this functionality of discarding the events? (my initial expectations that this happens in the window class turn out to be wrong). I checked and found the “EvictingNonKeyedWindowOperator” – is this the right place to look indeed?
> -          If yes, would it be possible to create a customizable class like this one and somehow pass it to the framework? I would be curious if there is an option  other than modifying the core classes and recompiling the framework?
>  
> On a slightly parallel topic - is there some way of creating a state in the evictor that will be check pointed and restore in case of failure.  I would be interested if something like an operator state is possible in the evictor.
>  
> Regards,
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft:
> Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and
> its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

Reply | Threaded
Open this post in threaded view
|

Re: events eviction

Aljoscha Krettek
Hi,
yes, in some cases it could be necessary. Could you maybe give some example of what kind of window computation you want to achieve? Then we can see if it would be possible without GlobalWindows and evictor.

Cheers,
Aljoscha

> On 15 Feb 2016, at 18:07, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>
> Thanks Aljoscha for the details!
>
> The warning about performance and evictors is useful, but I am not sure how it can be put in practice always. Take for example a GlobalWindow that you would use to aggregate data from multiple partitions. A GlobalWindow does not come with a trigger - would it have than a default evictor? Even if it has some, you still need to control the eviction of the events. Secondly, assuming that you would need to aggregate the data from 2 partitions and evict something only when you have one item from each partition. You would need a sort of state for this. And then to ensure resiliency, the state should be recoverable if a crash happens. Could you approach this without an evictor state?
>
>
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: Monday, February 15, 2016 11:58 AM
> To: [hidden email]
> Subject: Re: events eviction
>
> Hi,
> you are right, the logic is in EvictingNonKeyedWindowOperator.emitWindow() for non-parallel (non-keyed) windows and in EvictingWindow.processTriggerResult() in the case of keyed windows.
>
> You are also right about the contract of the Evictor, it returns the number of elements to be evicted from the beginning. This also means that eviction does not consider any timestamps in the elements and is therefore quite arbitrary. The places in the code I mentioned above simply get the value from the Evictor and evict that many elements from the internal buffer/state.
>
> Right now it is not possible to replace the window operator that is used by flink. What you can do is copy the window operator code and use it manually using DataStream.transform().
>
> About the evictor state. I’m afraid this is not possible right now. It was a conscious decision to make the Evictor stateless to make it easier for the system to handle. I would also strongly advise against using Evictors if at all possible. They make it impossible to incrementally aggregate window results (for example with a reduce function). This can have a huge performance/memory footprint impact. In your case, what are you using them for?
>
> I hope this helps somehow, but let us know if you need further explanations.
>
> Cheers,
> Aljoscha
>
>> On 15 Feb 2016, at 11:09, Radu Tudoran <[hidden email]> wrote:
>>
>> Hello,
>>
>> I am looking over the mechanisms of evicting events in Flink. I saw that either using a default evictor or building a custom one the logic is that the evictor will provide the number of events to be discarded.
>> Could you please provide me with some additional pointers regarding the mechanism in Flink where this actually happens:
>> -          The class that implements this functionality of discarding the events? (my initial expectations that this happens in the window class turn out to be wrong). I checked and found the “EvictingNonKeyedWindowOperator” – is this the right place to look indeed?
>> -          If yes, would it be possible to create a customizable class like this one and somehow pass it to the framework? I would be curious if there is an option  other than modifying the core classes and recompiling the framework?
>>
>> On a slightly parallel topic - is there some way of creating a state in the evictor that will be check pointed and restore in case of failure.  I would be interested if something like an operator state is possible in the evictor.
>>
>> Regards,
>>
>> Dr. Radu Tudoran
>> Research Engineer - Big Data Expert
>> IT R&D Division
>>
>> <image001.png>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>>
>> E-mail: [hidden email]
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
>> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
>> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft:
>> Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and
>> its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>

Reply | Threaded
Open this post in threaded view
|

RE: events eviction

Radu Tudoran
Hi,

Following up on the example before: you have to aggregate the data from 2 partitions (e.g. let's say in one you count objects of type 1 and in the other of type 2). Then you need to pair them together and emit that at iterations N you had: (object T1 - Y, object T2 - X) and finally evict them together. Alternatively you can also consider more complex operations not only this simple aggregation (e.g. see if any of objects o T1 is also an object of type T2).


-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Friday, February 19, 2016 10:47 AM
To: [hidden email]
Subject: Re: events eviction

Hi,
yes, in some cases it could be necessary. Could you maybe give some example of what kind of window computation you want to achieve? Then we can see if it would be possible without GlobalWindows and evictor.

Cheers,
Aljoscha

> On 15 Feb 2016, at 18:07, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>
> Thanks Aljoscha for the details!
>
> The warning about performance and evictors is useful, but I am not sure how it can be put in practice always. Take for example a GlobalWindow that you would use to aggregate data from multiple partitions. A GlobalWindow does not come with a trigger - would it have than a default evictor? Even if it has some, you still need to control the eviction of the events. Secondly, assuming that you would need to aggregate the data from 2 partitions and evict something only when you have one item from each partition. You would need a sort of state for this. And then to ensure resiliency, the state should be recoverable if a crash happens. Could you approach this without an evictor state?
>
>
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft:
> Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and
> its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: Monday, February 15, 2016 11:58 AM
> To: [hidden email]
> Subject: Re: events eviction
>
> Hi,
> you are right, the logic is in EvictingNonKeyedWindowOperator.emitWindow() for non-parallel (non-keyed) windows and in EvictingWindow.processTriggerResult() in the case of keyed windows.
>
> You are also right about the contract of the Evictor, it returns the number of elements to be evicted from the beginning. This also means that eviction does not consider any timestamps in the elements and is therefore quite arbitrary. The places in the code I mentioned above simply get the value from the Evictor and evict that many elements from the internal buffer/state.
>
> Right now it is not possible to replace the window operator that is used by flink. What you can do is copy the window operator code and use it manually using DataStream.transform().
>
> About the evictor state. I’m afraid this is not possible right now. It was a conscious decision to make the Evictor stateless to make it easier for the system to handle. I would also strongly advise against using Evictors if at all possible. They make it impossible to incrementally aggregate window results (for example with a reduce function). This can have a huge performance/memory footprint impact. In your case, what are you using them for?
>
> I hope this helps somehow, but let us know if you need further explanations.
>
> Cheers,
> Aljoscha
>
>> On 15 Feb 2016, at 11:09, Radu Tudoran <[hidden email]> wrote:
>>
>> Hello,
>>
>> I am looking over the mechanisms of evicting events in Flink. I saw that either using a default evictor or building a custom one the logic is that the evictor will provide the number of events to be discarded.
>> Could you please provide me with some additional pointers regarding the mechanism in Flink where this actually happens:
>> -          The class that implements this functionality of discarding the events? (my initial expectations that this happens in the window class turn out to be wrong). I checked and found the “EvictingNonKeyedWindowOperator” – is this the right place to look indeed?
>> -          If yes, would it be possible to create a customizable class like this one and somehow pass it to the framework? I would be curious if there is an option  other than modifying the core classes and recompiling the framework?
>>
>> On a slightly parallel topic - is there some way of creating a state in the evictor that will be check pointed and restore in case of failure.  I would be interested if something like an operator state is possible in the evictor.
>>
>> Regards,
>>
>> Dr. Radu Tudoran
>> Research Engineer - Big Data Expert
>> IT R&D Division
>>
>> <image001.png>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>>
>> E-mail: [hidden email]
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
>> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
>> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft:
>> Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and
>> its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>