Better way to share large data across task managers

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Better way to share large data across task managers

Dongwon Kim-2
Hi,

I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB.

I've launched 32 TMs by setting 
- taskmanager.numberOfTaskSlots : 6
- parallelism of the non-broadcast side : 192

Here's some questions:
1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible.

The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost.

Any advice would be much appreciated.

Best,

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: Better way to share large data across task managers

Kostas Kloudas-2
Hi Dongwon,

If you know the data in advance, you can always use the Yarn options
in [1] (e.g. the "yarn.ship-directories") to ship the directories with
the data you want only once to each Yarn container (i.e. TM) and then
write a udf which reads them in the open() method. This will allow the
data to be shipped only once per TM but then each of the tasks will
have its own copy in memory of course. By default the visibility of
the files that you ship is set to APPLICATION [2], if I am not
mistaken so if more than one TMs go to the same node, then you will
have even less copies shipped.

Does this help with your usecase?

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
[2] https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html

On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <[hidden email]> wrote:

>
> Hi,
>
> I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB.
>
> I've launched 32 TMs by setting
> - taskmanager.numberOfTaskSlots : 6
> - parallelism of the non-broadcast side : 192
>
> Here's some questions:
> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible.
>
> The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost.
>
> Any advice would be much appreciated.
>
> Best,
>
> Dongwon
>
> [1] https://flink.apache.org/2019/06/26/broadcast-state.html
Reply | Threaded
Open this post in threaded view
|

Re: Better way to share large data across task managers

Dongwon Kim-2
Hi Kostas,

Thanks for the input!

BTW, I guess you assume that the broadcasting occurs just once for
bootstrapping, huh?
My job needs not only bootstrapping but also periodically fetching a
new version of data from some external storage.

Thanks,

Dongwon

> 2020. 9. 23. 오전 4:59, Kostas Kloudas <[hidden email]> 작성:
>
> Hi Dongwon,





>
> If you know the data in advance, you can always use the Yarn options
> in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> the data you want only once to each Yarn container (i.e. TM) and then
> write a udf which reads them in the open() method. This will allow the
> data to be shipped only once per TM but then each of the tasks will
> have its own copy in memory of course. By default the visibility of
> the files that you ship is set to APPLICATION [2], if I am not
> mistaken so if more than one TMs go to the same node, then you will
> have even less copies shipped.
>
> Does this help with your usecase?
>
> Cheers,
> Kostas
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> [2] https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
>
>> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <[hidden email]> wrote:
>> Hi,
>> I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB.
>> I've launched 32 TMs by setting
>> - taskmanager.numberOfTaskSlots : 6
>> - parallelism of the non-broadcast side : 192
>> Here's some questions:
>> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
>> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible.
>> The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost.
>> Any advice would be much appreciated.
>> Best,
>> Dongwon
>> [1] https://flink.apache.org/2019/06/26/broadcast-state.html
Reply | Threaded
Open this post in threaded view
|

Re: Better way to share large data across task managers

Kostas Kloudas-2
Hi Dongwon,

Yes, you are right that I assume that broadcasting occurs once. This
is what I meant by "If you know the data in advance". Sorry for not
being clear. If you need to periodically broadcast new versions of the
data, then I cannot find a better solution than the one you propose
with the static var.

Cheers,
Kostas

On Wed, Sep 23, 2020 at 11:49 AM Dongwon Kim <[hidden email]> wrote:

>
> Hi Kostas,
>
> Thanks for the input!
>
> BTW, I guess you assume that the broadcasting occurs just once for
> bootstrapping, huh?
> My job needs not only bootstrapping but also periodically fetching a
> new version of data from some external storage.
>
> Thanks,
>
> Dongwon
>
> > 2020. 9. 23. 오전 4:59, Kostas Kloudas <[hidden email]> 작성:
> >
> > Hi Dongwon,
>
>
>
>
>
> >
> > If you know the data in advance, you can always use the Yarn options
> > in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> > the data you want only once to each Yarn container (i.e. TM) and then
> > write a udf which reads them in the open() method. This will allow the
> > data to be shipped only once per TM but then each of the tasks will
> > have its own copy in memory of course. By default the visibility of
> > the files that you ship is set to APPLICATION [2], if I am not
> > mistaken so if more than one TMs go to the same node, then you will
> > have even less copies shipped.
> >
> > Does this help with your usecase?
> >
> > Cheers,
> > Kostas
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> > [2] https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
> >
> >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <[hidden email]> wrote:
> >> Hi,
> >> I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB.
> >> I've launched 32 TMs by setting
> >> - taskmanager.numberOfTaskSlots : 6
> >> - parallelism of the non-broadcast side : 192
> >> Here's some questions:
> >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
> >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible.
> >> The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost.
> >> Any advice would be much appreciated.
> >> Best,
> >> Dongwon
> >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html