Reading compressed XML data

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading compressed XML data

Sebastian Neef
Hi,

what's the best way to read a compressed (bz2 / gz) XML file splitting
it at a specific XML-tag?

So far I've been using hadoop's TextInputFormat in combination with
mahouts XmlInputFormat ([0]) with env.readHadoopFile(). Whereas the
plain TextInputFormat can handle compressed data, the XmlInputFormat
can't for some reason.

Is there a flink-ish way to accomplish this?

Best regards,
Sebastian

[0]:
https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java
Reply | Threaded
Open this post in threaded view
|

Re: Reading compressed XML data

rmetzger0
Hi Sebastian,

I'm not aware of a better way of implementing this in Flink. You could implement your own XmlInputFormat using Flink's InputFormat abstractions, but you would end up with almost exactly the same code as Mahout / Hadoop.
I wonder why the decompression with the XmlInputFormat doesn't work. Did you get any exception?

Regards,
Robert


On Wed, Jan 11, 2017 at 4:31 PM, Sebastian Neef <[hidden email]> wrote:
Hi,

what's the best way to read a compressed (bz2 / gz) XML file splitting
it at a specific XML-tag?

So far I've been using hadoop's TextInputFormat in combination with
mahouts XmlInputFormat ([0]) with env.readHadoopFile(). Whereas the
plain TextInputFormat can handle compressed data, the XmlInputFormat
can't for some reason.

Is there a flink-ish way to accomplish this?

Best regards,
Sebastian

[0]:
https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java

Reply | Threaded
Open this post in threaded view
|

Re: Reading compressed XML data

Sebastian Neef
Hi Robert,

sorry for the long delay.

> I wonder why the decompression with the XmlInputFormat doesn't work. Did
> you get any exception?

I didn't get any exception, it just seems to read nothing (or at least
don't match any opening/closing tags).

I digged a bit into the code and found out, that Mahout's XmlInputFormat
[0] extends the TextInputFormat [1].  TextInputFormat then uses the
LineRecordReader [2] which handles the compressed data.

However, the Mahout XMLRecordReader [3] does not contain the compression
handling. So I tried to build a XmlRecordReader which tries to achieve
that [4]. I use it to split the wikipedia dumps into pages with <page>
and </page> tags. [5]

It does work, but somehow misses some data sometimes and I guess this is
because of the different splits. How do FileSplits work? Can a process
read beyond the FileSplit boundary or not?

I'm also a bit confused why the Flink Doc says that Bzip2 is not
splittable? [6]
Afaik hadoop (and flink in compatibility mode) does support splittable,
compressed data.

I would appreciate some input/ideas/help with this.

All the best,
Sebastian


[0]:
https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java

[1]:
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java

[2]:
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java

[3]:
https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java#L64

[4]:
http://paste.gehaxelt.in/?69af3c91480b6bfb#ze+G/X9b3yTHfu1QW70aJioDvXWKoFFOCnLND1ow0sU=

[5]:
http://paste.gehaxelt.in/?e869d1f1f9f6f1be#kXrNaWXTNqLiHEKL4a6rWVMxhbVcmpXu24jGqJcap1A=

[6]:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#read-compressed-files
Reply | Threaded
Open this post in threaded view
|

Re: Reading compressed XML data

Till Rohrmann
Hi Sebastian,

file input splits basically define the region of a file which a subtask will read. Thus, your file input format would have to break up the bzip2 file exactly at the border of compressed blocks when generating the input file splits. Otherwise a subtask won't be able to decompress it.

I'm not sure why the documentation says that bzip2 is not splittable. I assume it's because our input format is not able to generate input splits for the compressed blocks of the bzip2 file. But in theory it should be possible.

Cheers,
Till

On Thu, Feb 16, 2017 at 4:41 PM, Sebastian Neef <[hidden email]> wrote:
Hi Robert,

sorry for the long delay.

> I wonder why the decompression with the XmlInputFormat doesn't work. Did
> you get any exception?

I didn't get any exception, it just seems to read nothing (or at least
don't match any opening/closing tags).

I digged a bit into the code and found out, that Mahout's XmlInputFormat
[0] extends the TextInputFormat [1].  TextInputFormat then uses the
LineRecordReader [2] which handles the compressed data.

However, the Mahout XMLRecordReader [3] does not contain the compression
handling. So I tried to build a XmlRecordReader which tries to achieve
that [4]. I use it to split the wikipedia dumps into pages with <page>
and </page> tags. [5]

It does work, but somehow misses some data sometimes and I guess this is
because of the different splits. How do FileSplits work? Can a process
read beyond the FileSplit boundary or not?

I'm also a bit confused why the Flink Doc says that Bzip2 is not
splittable? [6]
Afaik hadoop (and flink in compatibility mode) does support splittable,
compressed data.

I would appreciate some input/ideas/help with this.

All the best,
Sebastian


[0]:
https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java

[1]:
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java

[2]:
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java

[3]:
https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java#L64

[4]:
http://paste.gehaxelt.in/?69af3c91480b6bfb#ze+G/X9b3yTHfu1QW70aJioDvXWKoFFOCnLND1ow0sU=

[5]:
http://paste.gehaxelt.in/?e869d1f1f9f6f1be#kXrNaWXTNqLiHEKL4a6rWVMxhbVcmpXu24jGqJcap1A=

[6]:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#read-compressed-files

Reply | Threaded
Open this post in threaded view
|

Re: Reading compressed XML data

Sebastian Neef
Hi Till,

just to clarify and for my understanding.

Let's assume we have the following Bzip2 file:

--------------------------------------------
|A.BA.B|A...B|A....|..BA.|...BA|....B|A...B|
--------------------------------------------
|1     |2    |3    |4    |5    |6    |7    | ("block number")

The FileInputFormat will need to split the bzip on the specific blocks
(marked with pipes here).

If I understood you correctly, every subtask (the InputFormat which gets
a FileSplit passed), will/should only see a part of the whole bzip.

That is fine for blocks where the records (an "A..B"-block) are within
the block's bounds.

I think I don't fully understand what happens when a record is split
between two or more blocks. Can a subtask, which for example handles
block 3, read into the fourth block to complete the record?

Cheers,
Sebastian