Hey everyone,
I just noticed that when processing input splits from a DelimitedInputFormat (specifically, I have a text file with words in it), that if the splitLength is 0, the entire readbuffer is filled (see https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java#L577). I'm using XtreemFS as underlying file system, which stripes files in blocks of 128kb across storage servers. I have 8 physically separate nodes, and my input file is 1MB, such that each node stores 128kb of data. This is reported accurately to Flink (e.g. split sizes and hostnames). Now when the splitLength is 0 at some point during processing (which it will become eventually), the entire file is read in again, which kind of defeats the point of processing a split of length 0. Is this intended behavior? I've tried multiple hot-fixes, but they ended up in the file not bein read in its entirety. I would like to know the rationale behind this implementation, and maybe figure out a way around it. Thanks in advance, Robert My GPG Key ID: 336E2680 |
Hi Robert! This clearly sounds like unintended behavior. Thanks for reporting this. Apparently, the 0 line length was supposed to have a double meaning, but it goes haywire in this case. Let me try to come with a fix for this... Greetings, Stephan On Fri, Jul 10, 2015 at 6:05 PM, Robert Schmidtke <[hidden email]> wrote:
|
Hi Robert! I did some debugging and added some tests. Turns out, this is actually expected behavior. It has to do with the splitting of the records. Because creating the splits happens without knowing the contents, the split can be either in the middle of a record, or (by chance) exactly at the boundary of a record. To make each split handle this consistently without knowing what the others do, the contract is the following: - Each but the first split skip initially over the records until the first delimiter. - Each split reads to the next delimiter beyond the split boundary. The case when the split size is 0 is the point when the split has to read one more record (or complete the current record), so it gets one more chunk of data. The problem in your case is actually that the split size is so low, that the "read buffer to compete the current record" operation reads the split twice. Can you reduce the buffer size to something that is reasonable? you can also increase the split size. I think 128KB will result in high coordination overhead for Flink, because these are distributed with RPC messages from the master (1 message per split). Greetings, Stephan On Fri, Jul 10, 2015 at 6:55 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi Stephan, I figured as much, since 128k is a plit size that is not commonly used in large scale data processing engines. I will go for increasing the split size to reduce coordination overhead for Flink. It just so happened that my small toy example brought up the issue. Thanks for clearing this up. Robert On Sun, Jul 12, 2015 at 9:21 PM, Stephan Ewen <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
Free forum by Nabble | Edit this page |