Hi,
I'm currently examining the I/O patterns of Flink, and I'd like to know when/how Flink goes to disk. Let me give an introduction of what I have done so far. I am running TeraGen (from the Hadoop examples package) + TeraSort (https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of disk. I'm using YARN and HDFS. The underlying file system is XFS. Now before running TeraGen and TeraSort, I reset the XFS counters to zero, and after TeraGen + TeraSort are finished, I dump the XFS counters again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB of reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1 for TeraSort) and 1 TiB of reads (during TeraSort). Unsatisfied by the coarseness of these numbers I developed an HDFS wrapper that logs file system statistics for each call to hdfs://..., such as start time/end time, no. of bytes read/written etc. I can plot these numbers and see what I expect: during TeraGen I have 1 TiB of writes to hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes to hdfs://... So far, so good. Now this still did not explain the disk I/O, so I added bytecode instrumentation to a range of Java classes, like FileIn/OutputStream, RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory mapped files etc., and have the same statistics: start/end of a read from/write to disk, no. of bytes involved and such. I can plot these numbers too and see that the HDFS JVMs write 1 TiB of data to disk during TeraGen (expected) and read and write 1 TiB from and to disk during TeraSort (expected). Sorry for the enormous introduction, but now there's finally the interesting part: Flink's JVMs read from and write to disk 1 TiB of data each during TeraSort. I'm suspecting there is some sort of spilling involved, potentially because I have not done the setup properly. But that is not the crucial point: my statistics give a total of 3 TiB of writes to disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters from above. However, my statistics only give 2 TiB of reads from disk (1 TiB for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from disk somewhere. I have done the same with Hadoop TeraSort, and there I'm not missing any data, meaning my statistics agree with XFS for TeraSort on Hadoop, which is why I suspect there are some cases where Flink goes to disk without me noticing it. Therefore here finally the question: in which cases does Flink go to disk, and how does it do so (meaning precisely which Java classes are involved, so I can check my bytecode instrumentation)? This would also include any kind of resource distribution via HDFS/YARN I guess (like JAR files and I don't know what). Seeing that I'm missing an amount of data equal to the size of my input set I'd suspect there must be some sort of shuffling/spilling at play here, but I'm not sure. Maybe there is also some sort of remote I/O involved via sockets or so that I'm missing. Any hints as to where Flink might incur disk I/O are greatly appreciated! I'm also happy with doing the digging myself, once pointed to the proper packages in the Apache Flink source tree (I have done my fair share of inspection already, but could not be sure whether or not I have missed something). Thanks a lot in advance! Robert My GPG Key ID: 336E2680 |
Hi, I have already looked at the UnilateralSortMerger, concluding that all I/O eventually goes via SegmentReadRequest and SegmentWriteRequest (which in turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are there more interaction points between Flink and the underlying file system that I might want to consider? Thanks! Robert On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
Hey Robert,
for batch that should cover the relevant spilling code. If the records are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill incoming records as well. But that should be covered by the FileChannel instrumentation as well? – Ufuk On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke <[hidden email]> wrote: > Hi, > > I have already looked at the UnilateralSortMerger, concluding that all I/O > eventually goes via SegmentReadRequest and SegmentWriteRequest (which in > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are > there more interaction points between Flink and the underlying file system > that I might want to consider? > > Thanks! > Robert > > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <[hidden email]> wrote: >> >> Hi, >> >> You probably want check out UnilateralSortMerger.java, this is the class >> which is responsible for external sort for flink. Here is a short >> description for how it works: there are totally 3 threads working together, >> one for reading, one for sorting partial data in memory, and the last one is >> responsible for spilling. Flink will first figure out how many memory it can >> use during the in-memory sort, and manage them as MemorySegments. Once these >> memory runs out, the sorting thread will take over these memory and do the >> in-memory sorting (For more details about in-memory sorting, you can see >> NormalizedKeySorter). After this, the spilling thread will write this sorted >> data to disk and make these memory available again for reading. This will >> repeated until all data has been processed. >> Normally, the data will be read twice (one from source, and one from disk) >> and write once, but if you spilled too much files, flink will first merge >> some all the files and make sure the last merge step will not exceed some >> limit (default 128). Hope this can help you. >> >> Best, >> Kurt >> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <[hidden email]> >> wrote: >>> >>> Hi, >>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to know >>> when/how Flink goes to disk. Let me give an introduction of what I have done >>> so far. >>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort >>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of disk. >>> I'm using YARN and HDFS. The underlying file system is XFS. >>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS counters >>> again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB >>> of reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1 >>> for TeraSort) and 1 TiB of reads (during TeraSort). >>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS >>> wrapper that logs file system statistics for each call to hdfs://..., such >>> as start time/end time, no. of bytes read/written etc. I can plot these >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes >>> to hdfs://... So far, so good. >>> >>> Now this still did not explain the disk I/O, so I added bytecode >>> instrumentation to a range of Java classes, like FileIn/OutputStream, >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory >>> mapped files etc., and have the same statistics: start/end of a read >>> from/write to disk, no. of bytes involved and such. I can plot these numbers >>> too and see that the HDFS JVMs write 1 TiB of data to disk during TeraGen >>> (expected) and read and write 1 TiB from and to disk during TeraSort >>> (expected). >>> >>> Sorry for the enormous introduction, but now there's finally the >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of data >>> each during TeraSort. I'm suspecting there is some sort of spilling >>> involved, potentially because I have not done the setup properly. But that >>> is not the crucial point: my statistics give a total of 3 TiB of writes to >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters >>> from above. However, my statistics only give 2 TiB of reads from disk (1 TiB >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from disk >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm not >>> missing any data, meaning my statistics agree with XFS for TeraSort on >>> Hadoop, which is why I suspect there are some cases where Flink goes to disk >>> without me noticing it. >>> >>> Therefore here finally the question: in which cases does Flink go to >>> disk, and how does it do so (meaning precisely which Java classes are >>> involved, so I can check my bytecode instrumentation)? This would also >>> include any kind of resource distribution via HDFS/YARN I guess (like JAR >>> files and I don't know what). Seeing that I'm missing an amount of data >>> equal to the size of my input set I'd suspect there must be some sort of >>> shuffling/spilling at play here, but I'm not sure. Maybe there is also some >>> sort of remote I/O involved via sockets or so that I'm missing. >>> >>> Any hints as to where Flink might incur disk I/O are greatly appreciated! >>> I'm also happy with doing the digging myself, once pointed to the proper >>> packages in the Apache Flink source tree (I have done my fair share of >>> inspection already, but could not be sure whether or not I have missed >>> something). Thanks a lot in advance! >>> >>> Robert >>> >>> -- >>> My GPG Key ID: 336E2680 >> >> > > > > -- > My GPG Key ID: 336E2680 |
Hi Ufuk, thanks for coming back to me on this. The records are 100 bytes in size, the benchmark being TeraSort, so that should not be an issue. I have played around with the input size, and here are my observations: 128 GiB input: 0 Spilling in Flink. 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of writes), and my instrumentation covers all of it. 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it. 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it. 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it. 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it. 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it. 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it. So regardless of how well configured my system is and spilling is even necessary, it seems that with larger spilling amounts, the way the data is spilled changes (and I start missing larger and larger portions of I/O until almost 100%). Now since I have written the instrumentation myself, I cannot guarantee that it is flawless and I might have missed something. I'm currently looking into how the file channels are being accessed in parallel by multiple threads, which I cover as well and my tests verify it, but maybe there are special access patterns here. Robert On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <[hidden email]> wrote: Hey Robert, My GPG Key ID: 336E2680 |
Hi Robert,
Any updates on the below for the community? Thanks, M On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <[hidden email]> wrote:
|
Hey Martin, I'm still on it. I have switched to analyzing the flink-runtime tests, as I observe similar divergence there. I'm not sure how long it'll take, but if I find something I'll make sure to let you all know :) Robert On Sat, Apr 29, 2017 at 3:12 PM, Martin Eden <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
Minor update: I have executed the flink-runtime tests on XFS, Lustre and DVS (Cray DataWarp), and I observe divergences on XFS and Lustre, but not on DVS. It turns out that cached reads are reported by the file systems as well, so I don't think caching is an issue here. There might still be some threading issues which I do not cover, and maybe for some reason DVS serializes access, which is why my statistics and DVS agree to 100%. I'll get more experiments going and report back. Robert On Sat, Apr 29, 2017 at 4:53 PM, Robert Schmidtke <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
Free forum by Nabble | Edit this page |