Hi,
what's the best way to read a compressed (bz2 / gz) XML file splitting it at a specific XML-tag? So far I've been using hadoop's TextInputFormat in combination with mahouts XmlInputFormat ([0]) with env.readHadoopFile(). Whereas the plain TextInputFormat can handle compressed data, the XmlInputFormat can't for some reason. Is there a flink-ish way to accomplish this? Best regards, Sebastian [0]: https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java |
Hi Sebastian, I'm not aware of a better way of implementing this in Flink. You could implement your own XmlInputFormat using Flink's InputFormat abstractions, but you would end up with almost exactly the same code as Mahout / Hadoop. I wonder why the decompression with the XmlInputFormat doesn't work. Did you get any exception? Regards, Robert On Wed, Jan 11, 2017 at 4:31 PM, Sebastian Neef <[hidden email]> wrote: Hi, |
Hi Robert,
sorry for the long delay. > I wonder why the decompression with the XmlInputFormat doesn't work. Did > you get any exception? I didn't get any exception, it just seems to read nothing (or at least don't match any opening/closing tags). I digged a bit into the code and found out, that Mahout's XmlInputFormat [0] extends the TextInputFormat [1]. TextInputFormat then uses the LineRecordReader [2] which handles the compressed data. However, the Mahout XMLRecordReader [3] does not contain the compression handling. So I tried to build a XmlRecordReader which tries to achieve that [4]. I use it to split the wikipedia dumps into pages with <page> and </page> tags. [5] It does work, but somehow misses some data sometimes and I guess this is because of the different splits. How do FileSplits work? Can a process read beyond the FileSplit boundary or not? I'm also a bit confused why the Flink Doc says that Bzip2 is not splittable? [6] Afaik hadoop (and flink in compatibility mode) does support splittable, compressed data. I would appreciate some input/ideas/help with this. All the best, Sebastian [0]: https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java [1]: https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java [2]: https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java [3]: https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java#L64 [4]: http://paste.gehaxelt.in/?69af3c91480b6bfb#ze+G/X9b3yTHfu1QW70aJioDvXWKoFFOCnLND1ow0sU= [5]: http://paste.gehaxelt.in/?e869d1f1f9f6f1be#kXrNaWXTNqLiHEKL4a6rWVMxhbVcmpXu24jGqJcap1A= [6]: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#read-compressed-files |
Hi Sebastian, file input splits basically define the region of a file which a subtask will read. Thus, your file input format would have to break up the bzip2 file exactly at the border of compressed blocks when generating the input file splits. Otherwise a subtask won't be able to decompress it. I'm not sure why the documentation says that bzip2 is not splittable. I assume it's because our input format is not able to generate input splits for the compressed blocks of the bzip2 file. But in theory it should be possible. Cheers, Till On Thu, Feb 16, 2017 at 4:41 PM, Sebastian Neef <[hidden email]> wrote: Hi Robert, |
Hi Till,
just to clarify and for my understanding. Let's assume we have the following Bzip2 file: -------------------------------------------- |A.BA.B|A...B|A....|..BA.|...BA|....B|A...B| -------------------------------------------- |1 |2 |3 |4 |5 |6 |7 | ("block number") The FileInputFormat will need to split the bzip on the specific blocks (marked with pipes here). If I understood you correctly, every subtask (the InputFormat which gets a FileSplit passed), will/should only see a part of the whole bzip. That is fine for blocks where the records (an "A..B"-block) are within the block's bounds. I think I don't fully understand what happens when a record is split between two or more blocks. Can a subtask, which for example handles block 3, read into the fourth block to complete the record? Cheers, Sebastian |
Free forum by Nabble | Edit this page |