how does flink assign windows to task

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

how does flink assign windows to task

vishnuviswanath
Hi,

Lets say I have a window on a keyed stream, and I have about 100 unique keys.
And assume I have about 50 tasks slots in my cluster. And suppose my trigger fired 70/100 windows/pane at the same time.

How will flink handle this? Will it assign 50/70 triggered windows to the 50 available task slots and wait for 20 of them to finish before assigning the remaining 20 to the slots? 

Thanks,
Vishnu Viswanath
Reply | Threaded
Open this post in threaded view
|

Re: how does flink assign windows to task

Maximilian Michels
Hi Vishnu Viswanath,

The keyed elements are spread across the 50 task slots (assuming you
have a parallelism of 50) using hash partitioning on the keys. Each
task slot runs one or multiple operators (depending on the slot
sharing options). One of them is a WindowOperator which will decide
when to trigger and process your keyed elements.

The WindowOperator holds the WindowAssigner and the Trigger. The
WindowAssigner will determine which window an incoming element gets
assigned. Windows are kept for each key; the combination of window and
key is usually called Pane. The Trigger will go through all the Panes
and check if they should fire or not (whether the window function
should be executed). This happens one after another in a single task
slot but in parallel across all the task slots.

Just a brief explanation. Hope it helps :)

Cheers,
Max

On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
<[hidden email]> wrote:

> Hi,
>
> Lets say I have a window on a keyed stream, and I have about 100 unique
> keys.
> And assume I have about 50 tasks slots in my cluster. And suppose my trigger
> fired 70/100 windows/pane at the same time.
>
> How will flink handle this? Will it assign 50/70 triggered windows to the 50
> available task slots and wait for 20 of them to finish before assigning the
> remaining 20 to the slots?
>
> Thanks,
> Vishnu Viswanath
Reply | Threaded
Open this post in threaded view
|

Re: how does flink assign windows to task

vishnuviswanath
Hi Max,

Thanks for the explanation.

"This happens one after another in a single task slot but in parallel across all the task slots". 
Could you explain more on how this happens in parallel? Which part does occur in parallel? Is it the Trigger going through each pane and the window function being executed. 
As in the first example, if there are 100 Panes (since I have 1 window and 100 keys) will trigger go through these 100 Panes using 50 task slots and then execute whichever fires?  Does that mean that Flink determines which are the set of Panes that has to be evaluated in each task slot and then the trigger goes through it? 

The reason I am trying to understand exactly how it works is because : I need to decide how much memory each node in my cluster should have. I know that a single pane would not cause OOM in my case(since the number of elements per pane is not huge), but nodes might not have enough memory to hold the entire window in memory (since I can have a large number of Panes).

Thanks,
Vishnu


On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels <[hidden email]> wrote:
Hi Vishnu Viswanath,

The keyed elements are spread across the 50 task slots (assuming you
have a parallelism of 50) using hash partitioning on the keys. Each
task slot runs one or multiple operators (depending on the slot
sharing options). One of them is a WindowOperator which will decide
when to trigger and process your keyed elements.

The WindowOperator holds the WindowAssigner and the Trigger. The
WindowAssigner will determine which window an incoming element gets
assigned. Windows are kept for each key; the combination of window and
key is usually called Pane. The Trigger will go through all the Panes
and check if they should fire or not (whether the window function
should be executed). This happens one after another in a single task
slot but in parallel across all the task slots.

Just a brief explanation. Hope it helps :)

Cheers,
Max

On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
<[hidden email]> wrote:
> Hi,
>
> Lets say I have a window on a keyed stream, and I have about 100 unique
> keys.
> And assume I have about 50 tasks slots in my cluster. And suppose my trigger
> fired 70/100 windows/pane at the same time.
>
> How will flink handle this? Will it assign 50/70 triggered windows to the 50
> available task slots and wait for 20 of them to finish before assigning the
> remaining 20 to the slots?
>
> Thanks,
> Vishnu Viswanath

Reply | Threaded
Open this post in threaded view
|

Re: how does flink assign windows to task

Sameer Wadkar
Vishnu,

I would imagine based on Max's explanation and how other systems like MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2 keys would be assigned to each slot. Each slot would maintain one or more windows (more for time based windows) and each window would have upto 2 panes (depending on whether there are elements for a key for a given window). The trigger would evaluate which of these panes will fire for global window (count windows) or which window as a whole fires for a time window. 

It seems like this is the only way to get the most efficient utilization for the entire cluster and allow all keys to be evaluated simultaneously without being starved by keys getting more elements in case of a slew. 

So I think you will need to have enough memory to hold all the elements that can arrive for all the active windows (not triggered) for two keys in a task. For count windows this is easy to estimate. But for times windows it is less clear if you receive elements out of order. 

Let's see what Max replies. I am just reasoning based on how Flink should work based on how other similar systems do it. 

Sameer


On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath <[hidden email]> wrote:

Hi Max,

Thanks for the explanation.

"This happens one after another in a single task slot but in parallel across all the task slots". 
Could you explain more on how this happens in parallel? Which part does occur in parallel? Is it the Trigger going through each pane and the window function being executed. 
As in the first example, if there are 100 Panes (since I have 1 window and 100 keys) will trigger go through these 100 Panes using 50 task slots and then execute whichever fires?  Does that mean that Flink determines which are the set of Panes that has to be evaluated in each task slot and then the trigger goes through it? 

The reason I am trying to understand exactly how it works is because : I need to decide how much memory each node in my cluster should have. I know that a single pane would not cause OOM in my case(since the number of elements per pane is not huge), but nodes might not have enough memory to hold the entire window in memory (since I can have a large number of Panes).

Thanks,
Vishnu


On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels <[hidden email]> wrote:
Hi Vishnu Viswanath,

The keyed elements are spread across the 50 task slots (assuming you
have a parallelism of 50) using hash partitioning on the keys. Each
task slot runs one or multiple operators (depending on the slot
sharing options). One of them is a WindowOperator which will decide
when to trigger and process your keyed elements.

The WindowOperator holds the WindowAssigner and the Trigger. The
WindowAssigner will determine which window an incoming element gets
assigned. Windows are kept for each key; the combination of window and
key is usually called Pane. The Trigger will go through all the Panes
and check if they should fire or not (whether the window function
should be executed). This happens one after another in a single task
slot but in parallel across all the task slots.

Just a brief explanation. Hope it helps :)

Cheers,
Max

On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
<[hidden email]> wrote:
> Hi,
>
> Lets say I have a window on a keyed stream, and I have about 100 unique
> keys.
> And assume I have about 50 tasks slots in my cluster. And suppose my trigger
> fired 70/100 windows/pane at the same time.
>
> How will flink handle this? Will it assign 50/70 triggered windows to the 50
> available task slots and wait for 20 of them to finish before assigning the
> remaining 20 to the slots?
>
> Thanks,
> Vishnu Viswanath

Reply | Threaded
Open this post in threaded view
|

Re: how does flink assign windows to task

Till Rohrmann
Yes you're right Sameer. That's how things work in Flink.

Cheers,
Till

On Sun, Jul 31, 2016 at 12:33 PM, Sameer Wadkar <[hidden email]> wrote:
Vishnu,

I would imagine based on Max's explanation and how other systems like MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2 keys would be assigned to each slot. Each slot would maintain one or more windows (more for time based windows) and each window would have upto 2 panes (depending on whether there are elements for a key for a given window). The trigger would evaluate which of these panes will fire for global window (count windows) or which window as a whole fires for a time window. 

It seems like this is the only way to get the most efficient utilization for the entire cluster and allow all keys to be evaluated simultaneously without being starved by keys getting more elements in case of a slew. 

So I think you will need to have enough memory to hold all the elements that can arrive for all the active windows (not triggered) for two keys in a task. For count windows this is easy to estimate. But for times windows it is less clear if you receive elements out of order. 

Let's see what Max replies. I am just reasoning based on how Flink should work based on how other similar systems do it. 

Sameer


On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath <[hidden email]> wrote:

Hi Max,

Thanks for the explanation.

"This happens one after another in a single task slot but in parallel across all the task slots". 
Could you explain more on how this happens in parallel? Which part does occur in parallel? Is it the Trigger going through each pane and the window function being executed. 
As in the first example, if there are 100 Panes (since I have 1 window and 100 keys) will trigger go through these 100 Panes using 50 task slots and then execute whichever fires?  Does that mean that Flink determines which are the set of Panes that has to be evaluated in each task slot and then the trigger goes through it? 

The reason I am trying to understand exactly how it works is because : I need to decide how much memory each node in my cluster should have. I know that a single pane would not cause OOM in my case(since the number of elements per pane is not huge), but nodes might not have enough memory to hold the entire window in memory (since I can have a large number of Panes).

Thanks,
Vishnu


On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels <[hidden email]> wrote:
Hi Vishnu Viswanath,

The keyed elements are spread across the 50 task slots (assuming you
have a parallelism of 50) using hash partitioning on the keys. Each
task slot runs one or multiple operators (depending on the slot
sharing options). One of them is a WindowOperator which will decide
when to trigger and process your keyed elements.

The WindowOperator holds the WindowAssigner and the Trigger. The
WindowAssigner will determine which window an incoming element gets
assigned. Windows are kept for each key; the combination of window and
key is usually called Pane. The Trigger will go through all the Panes
and check if they should fire or not (whether the window function
should be executed). This happens one after another in a single task
slot but in parallel across all the task slots.

Just a brief explanation. Hope it helps :)

Cheers,
Max

On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
<[hidden email]> wrote:
> Hi,
>
> Lets say I have a window on a keyed stream, and I have about 100 unique
> keys.
> And assume I have about 50 tasks slots in my cluster. And suppose my trigger
> fired 70/100 windows/pane at the same time.
>
> How will flink handle this? Will it assign 50/70 triggered windows to the 50
> available task slots and wait for 20 of them to finish before assigning the
> remaining 20 to the slots?
>
> Thanks,
> Vishnu Viswanath


Reply | Threaded
Open this post in threaded view
|

Re: how does flink assign windows to task

vishnuviswanath
Thanks Sameer and Till,


On Mon, Aug 1, 2016 at 9:31 AM, Till Rohrmann <[hidden email]> wrote:
Yes you're right Sameer. That's how things work in Flink.

Cheers,
Till

On Sun, Jul 31, 2016 at 12:33 PM, Sameer Wadkar <[hidden email]> wrote:
Vishnu,

I would imagine based on Max's explanation and how other systems like MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2 keys would be assigned to each slot. Each slot would maintain one or more windows (more for time based windows) and each window would have upto 2 panes (depending on whether there are elements for a key for a given window). The trigger would evaluate which of these panes will fire for global window (count windows) or which window as a whole fires for a time window. 

It seems like this is the only way to get the most efficient utilization for the entire cluster and allow all keys to be evaluated simultaneously without being starved by keys getting more elements in case of a slew. 

So I think you will need to have enough memory to hold all the elements that can arrive for all the active windows (not triggered) for two keys in a task. For count windows this is easy to estimate. But for times windows it is less clear if you receive elements out of order. 

Let's see what Max replies. I am just reasoning based on how Flink should work based on how other similar systems do it. 

Sameer


On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath <[hidden email]> wrote:

Hi Max,

Thanks for the explanation.

"This happens one after another in a single task slot but in parallel across all the task slots". 
Could you explain more on how this happens in parallel? Which part does occur in parallel? Is it the Trigger going through each pane and the window function being executed. 
As in the first example, if there are 100 Panes (since I have 1 window and 100 keys) will trigger go through these 100 Panes using 50 task slots and then execute whichever fires?  Does that mean that Flink determines which are the set of Panes that has to be evaluated in each task slot and then the trigger goes through it? 

The reason I am trying to understand exactly how it works is because : I need to decide how much memory each node in my cluster should have. I know that a single pane would not cause OOM in my case(since the number of elements per pane is not huge), but nodes might not have enough memory to hold the entire window in memory (since I can have a large number of Panes).

Thanks,
Vishnu


On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels <[hidden email]> wrote:
Hi Vishnu Viswanath,

The keyed elements are spread across the 50 task slots (assuming you
have a parallelism of 50) using hash partitioning on the keys. Each
task slot runs one or multiple operators (depending on the slot
sharing options). One of them is a WindowOperator which will decide
when to trigger and process your keyed elements.

The WindowOperator holds the WindowAssigner and the Trigger. The
WindowAssigner will determine which window an incoming element gets
assigned. Windows are kept for each key; the combination of window and
key is usually called Pane. The Trigger will go through all the Panes
and check if they should fire or not (whether the window function
should be executed). This happens one after another in a single task
slot but in parallel across all the task slots.

Just a brief explanation. Hope it helps :)

Cheers,
Max

On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
<[hidden email]> wrote:
> Hi,
>
> Lets say I have a window on a keyed stream, and I have about 100 unique
> keys.
> And assume I have about 50 tasks slots in my cluster. And suppose my trigger
> fired 70/100 windows/pane at the same time.
>
> How will flink handle this? Will it assign 50/70 triggered windows to the 50
> available task slots and wait for 20 of them to finish before assigning the
> remaining 20 to the slots?
>
> Thanks,
> Vishnu Viswanath