Running Flink on Yarn

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Running Flink on Yarn

Anil
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Running Flink on Yarn

Andrey Zagrebin
Hi,

I suppose you apply windowing to a keyed stream or SQL time-windowed join?
Globally windowed streams are non-parallel and processed/stored in one slot.

In case of keyed stream, total range of key values is distributed among slots.
Each slot processes/stores only a subrange of keys.
Window state is stored per key then.
This implies that each slot stores its own data, not the same.
The keyed state is not shared among slots in the same JVM.

Best,
Andrey

> On 23 Dec 2018, at 21:35, Anil <[hidden email]> wrote:
>
> I  have a setup for Flink(1.4.2) with YARN. I'm using Flink Yarn Client for
> deploying my jobs to Yarn Cluster.
>
> In the current setup parallelism was directly mapped to the number of cores,
> with each parallel instance of the job running in one container. So for a
> parallelism of 9, there are 10 containers - 1 JM and 9 TM and each container
> has 1 core. Each container(or each parallel instance) has one task manager
> and each slot holds the entire pipeline for the job.
>
> Most of the jobs have a join with the window storing data for last ⅔ hours.
> As per my understanding here,
> each container will save it's own copy of the this last 2/3 hours data and
> this is not shared between two container.
>
> Since this window data will be same across each container, I feel if I could
> have one task manager with  with multiple task slot that could share this
> window data I could save a lot on my resources (each container won't need to
> maintain it's own copy of window data). If I had 3 container each with one
> TM and 3 Task Slot each, then I would need only 3 containers for my job to
> achieve a parallelism of 9 (each task slot will hold the entire job
> pipeline, so each container helps me achieve a parallelism of 3
> individually). I'm assuming that this window data will be shared among all
> parallel instance running in different task slot in each container. Please
> correct me here.
>
> As per flink docs -
>
> Having multiple slots means more subtasks share the same JVM. Tasks in the
> same JVM share TCP connections (via multiplexing) and heartbeat messages.
> They may also share data sets and data structures, thus reducing the
> per-task overhead.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Running Flink on Yarn

Anil
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Running Flink on Yarn

Andrey Zagrebin-2

On Mon, Dec 24, 2018 at 6:29 PM Anil <[hidden email]> wrote:
Thanks for the quick response Andrey. I'm doing a SQL time-windowed join on
non-keyed stream.
So all the thread in various task slot in the same TM will share this state.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Running Flink on Yarn

Anil
This post was updated on .
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Running Flink on Yarn

Andrey Zagrebin-2
I think the data buffered for join will be distributed among threads by order_id (a1 and a2 will be internally keyed). 
Each thread will have non-shared window state (for 2 hours) per certain order_id's.
Slots will share some common JVM resources mentioned in docs, also access to state DB but not the majority of storage occupied by state.

cc Timo, Piotr

On Mon, Dec 24, 2018 at 7:46 PM Anil <[hidden email]> wrote:
I am using  time-windowed join only. Here's a sample query -

SELECT a1.order_id, a2.order.restaurant_id FROM awz_s3_stream1 a1 INNER JOIN
awz_s3_stream2 a2 ON CAST(a1.order_id AS VARCHAR) = a2.order_id AND
a1.to_state = 'PLACED' AND a1.proctime BETWEEN a2.proctime - INTERVAL '2'
HOUR AND a2.proctime + INTERVAL '2' HOUR GROUP BY HOP(a2.proctime, INTERVAL
'2' MINUTE, INTERVAL '1' HOUR), a2.`order`.restaurant_id

Just to simplify my question -

Suppose I have a TM with 4 slots and I deploy a flink job with parallelism=4
with 2 container - 1 JM and 1 TM. Each parallel instance will be deployed in
one task slot each in the TM (the entire job pipeline running per slot ).My
jobs does a join(SQL time-windowed join on non-keyed stream) and they buffer
last few hours of data. My question is will these threads running in
different task slot share this data buffered for join. What all data is
shared across these threads.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Running Flink on Yarn

Anil
CONTENTS DELETED
The author has deleted this message.