DataStream Batch Execution Mode and large files.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStream Batch Execution Mode and large files.

Marco Villalobos-2
Hi,

I am using the DataStream API in Batch Execution Mode, and my "source" is an s3 Buckets with about 500 GB of data spread across many files.

Where does Flink stored the results of processed / produced data between tasks?

There is no way that 500GB will fit in memory.  So I am very curious how that happens.

Can somebody please explain?

Thank you.

Marco A. Villalobos
Reply | Threaded
Open this post in threaded view
|

Re: DataStream Batch Execution Mode and large files.

Yun Gao
Hi Marco,

With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking
and would use intermediate file to transfer data. Flink now support hash shuffle 
and sort shuffle for blocking edges[1], both of them stores the intermediate files in
the directories configured by io.tmp.dirs[2].


[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#io-tmp-dirs
------------------Original Mail ------------------
Sender:Marco Villalobos <[hidden email]>
Send Date:Wed May 19 09:50:45 2021
Recipients:user <[hidden email]>
Subject:DataStream Batch Execution Mode and large files.
Hi,

I am using the DataStream API in Batch Execution Mode, and my "source" is an s3 Buckets with about 500 GB of data spread across many files.

Where does Flink stored the results of processed / produced data between tasks?

There is no way that 500GB will fit in memory.  So I am very curious how that happens.

Can somebody please explain?

Thank you.

Marco A. Villalobos
Reply | Threaded
Open this post in threaded view
|

Re: DataStream Batch Execution Mode and large files.

Marco Villalobos-2
Thank you very much. You've been very helpful.  

Since my intermediate results are large, I suspect that io.tmp.dirs must literally be on the local file system. Thus, since I use EMR, I'll need to configure EBS to support more data.

On Tue, May 18, 2021 at 11:08 PM Yun Gao <[hidden email]> wrote:
Hi Marco,

With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking
and would use intermediate file to transfer data. Flink now support hash shuffle 
and sort shuffle for blocking edges[1], both of them stores the intermediate files in
the directories configured by io.tmp.dirs[2].
------------------Original Mail ------------------
Sender:Marco Villalobos <[hidden email]>
Send Date:Wed May 19 09:50:45 2021
Recipients:user <[hidden email]>
Subject:DataStream Batch Execution Mode and large files.
Hi,

I am using the DataStream API in Batch Execution Mode, and my "source" is an s3 Buckets with about 500 GB of data spread across many files.

Where does Flink stored the results of processed / produced data between tasks?

There is no way that 500GB will fit in memory.  So I am very curious how that happens.

Can somebody please explain?

Thank you.

Marco A. Villalobos