Hi all,
I've a Flink batch job that reads a parquet dataset and then applies 2 flatMap to it (see pseudocode below). The problem is that this dataset is quite big and Flink duplicates it before sending the data to these 2 operators (I've guessed this from the doubling amount of sent bytes) . Is there a way to avoid this behaviour? ------------------------------------------------------- Here's the pseudo code of my job: DataSet X = readParquetDir(); X1 = X.flatMap(...); X2 = X.flatMap(...); Best, Flavio |
Hi Flavio,
Which version of Flink are you using? -- Thanks, Amit On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <[hidden email]> wrote: > Hi all, > I've a Flink batch job that reads a parquet dataset and then applies 2 > flatMap to it (see pseudocode below). > The problem is that this dataset is quite big and Flink duplicates it before > sending the data to these 2 operators (I've guessed this from the doubling > amount of sent bytes) . > Is there a way to avoid this behaviour? > > ------------------------------------------------------- > Here's the pseudo code of my job: > > DataSet X = readParquetDir(); > X1 = X.flatMap(...); > X2 = X.flatMap(...); > > Best, > Flavio |
Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
On Fri, May 4, 2018 at 2:50 PM, Amit Jain <[hidden email]> wrote: Hi Flavio, |
Hi Flavio, No, there's no way around it. DataSets that are processed by more than one operator cannot be processed by chained operators. The records need to be copied to avoid concurrent modifications. However, the data should not be shipped over the network if all operators have the same parallelism. Instead records are serialized and handed over via local byte[] in-memory channels. 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Hi Fabian, thanks for the detailed reply. The problem I see is that the source dataset is huge and, since it doesn't fit in memory, it's spilled twice to disk (I checked the increasing disk usage during the job and it was corresponding exactly to the size estimated by the Flink UI, that is twice it's initial size). Probably there are no problem until you keep data in memory but in my case it's very problematic this memory explosion :( On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <[hidden email]> wrote:
Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 |
That will happen if you join (or coGroup) the branched DataSets, i.e., you have branching and merging pattern in your stream. You could change the join strategy to sort-merge-join but this will sort both inputs and also result in spilling both to disk. 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Does this duplication happen when I write directly to disk after the flatMaps?
On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske <[hidden email]> wrote:
|
The spilling will only happen when joining the branched data sets. If you keep them separate and eventually emit them, no intermediate data will be spilled. 2018-05-04 18:05 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Free forum by Nabble | Edit this page |