Hi,
I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB. I've launched 32 TMs by setting - taskmanager.numberOfTaskSlots : 6 - parallelism of the non-broadcast side : 192 Here's some questions: 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right? 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible. The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost. Any advice would be much appreciated. Best, Dongwon |
Hi Dongwon,
If you know the data in advance, you can always use the Yarn options in [1] (e.g. the "yarn.ship-directories") to ship the directories with the data you want only once to each Yarn container (i.e. TM) and then write a udf which reads them in the open() method. This will allow the data to be shipped only once per TM but then each of the tasks will have its own copy in memory of course. By default the visibility of the files that you ship is set to APPLICATION [2], if I am not mistaken so if more than one TMs go to the same node, then you will have even less copies shipped. Does this help with your usecase? Cheers, Kostas [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html [2] https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <[hidden email]> wrote: > > Hi, > > I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB. > > I've launched 32 TMs by setting > - taskmanager.numberOfTaskSlots : 6 > - parallelism of the non-broadcast side : 192 > > Here's some questions: > 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right? > 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible. > > The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost. > > Any advice would be much appreciated. > > Best, > > Dongwon > > [1] https://flink.apache.org/2019/06/26/broadcast-state.html |
Hi Kostas,
Thanks for the input! BTW, I guess you assume that the broadcasting occurs just once for bootstrapping, huh? My job needs not only bootstrapping but also periodically fetching a new version of data from some external storage. Thanks, Dongwon > 2020. 9. 23. 오전 4:59, Kostas Kloudas <[hidden email]> 작성: > > Hi Dongwon, > > If you know the data in advance, you can always use the Yarn options > in [1] (e.g. the "yarn.ship-directories") to ship the directories with > the data you want only once to each Yarn container (i.e. TM) and then > write a udf which reads them in the open() method. This will allow the > data to be shipped only once per TM but then each of the tasks will > have its own copy in memory of course. By default the visibility of > the files that you ship is set to APPLICATION [2], if I am not > mistaken so if more than one TMs go to the same node, then you will > have even less copies shipped. > > Does this help with your usecase? > > Cheers, > Kostas > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html > [2] https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html > >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <[hidden email]> wrote: >> Hi, >> I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB. >> I've launched 32 TMs by setting >> - taskmanager.numberOfTaskSlots : 6 >> - parallelism of the non-broadcast side : 192 >> Here's some questions: >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right? >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible. >> The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost. >> Any advice would be much appreciated. >> Best, >> Dongwon >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html |
Hi Dongwon,
Yes, you are right that I assume that broadcasting occurs once. This is what I meant by "If you know the data in advance". Sorry for not being clear. If you need to periodically broadcast new versions of the data, then I cannot find a better solution than the one you propose with the static var. Cheers, Kostas On Wed, Sep 23, 2020 at 11:49 AM Dongwon Kim <[hidden email]> wrote: > > Hi Kostas, > > Thanks for the input! > > BTW, I guess you assume that the broadcasting occurs just once for > bootstrapping, huh? > My job needs not only bootstrapping but also periodically fetching a > new version of data from some external storage. > > Thanks, > > Dongwon > > > 2020. 9. 23. 오전 4:59, Kostas Kloudas <[hidden email]> 작성: > > > > Hi Dongwon, > > > > > > > > > If you know the data in advance, you can always use the Yarn options > > in [1] (e.g. the "yarn.ship-directories") to ship the directories with > > the data you want only once to each Yarn container (i.e. TM) and then > > write a udf which reads them in the open() method. This will allow the > > data to be shipped only once per TM but then each of the tasks will > > have its own copy in memory of course. By default the visibility of > > the files that you ship is set to APPLICATION [2], if I am not > > mistaken so if more than one TMs go to the same node, then you will > > have even less copies shipped. > > > > Does this help with your usecase? > > > > Cheers, > > Kostas > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html > > [2] https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html > > > >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <[hidden email]> wrote: > >> Hi, > >> I'm using Flink broadcast state similar to what Fabian explained in [1]. One difference might be the size of the broadcasted data; the size is around 150MB. > >> I've launched 32 TMs by setting > >> - taskmanager.numberOfTaskSlots : 6 > >> - parallelism of the non-broadcast side : 192 > >> Here's some questions: > >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right? > >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in each TM can read the broadcasted data? I'm considering implementing a static class for the non-broadcast side to directly load data only once on each TaskManager instead of the broadcast state (FYI, I'm using per-job clusters on YARN, so each TM is only for a single job). However, I'd like to use Flink native facilities if possible. > >> The type of broadcasted data is Map<Long, Int> with around 600K entries, so every time the data is broadcasted a lot of GC is inevitable on each TM due to the (de)serialization cost. > >> Any advice would be much appreciated. > >> Best, > >> Dongwon > >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html |
Free forum by Nabble | Edit this page |