Hi all,
I am evaluating Apache Flink for processing large sets of Geospatial data. The use case I am working on will involve reading a certain number of GPX files stored on Amazon S3. GPX files are actually XML files and therefore cannot be read on a line by line basis. One GPX file will produce one or more Java objects that will contain the geospatial data we need to process (mostly a list of geographical points). To cover this use case I tried to extend the FileInputFormat class: public class WholeFileInputFormat extends FileInputFormat<String> { private boolean hasReachedEnd = false; public WholeFileInputFormat() { unsplittable = true; } @Override public void open(FileInputSplit fileSplit) throws IOException { super.open(fileSplit); hasReachedEnd = false; } @Override public String nextRecord(String reuse) throws IOException { // uses apache.commons.io.IOUtils String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8); hasReachedEnd = true; return fileContent; } @Override public boolean reachedEnd() throws IOException { return hasReachedEnd; } } This class returns the content of the whole file as a string. Is this the right approach? It seems to work when run locally with local files but I wonder if it would run into problems when tested in a cluster. Thanks in advance. Andrea. -- Andrea Cisternino, Erlangen, Germany GitHub: http://github.com/acisternino GitLab: https://gitlab.com/u/acisternino |
Hi Andrea, How large are these data files? The implementation you've mentioned here is only usable if they are very small. If so, you're fine. If not read on... Processing XML input files in parallel is tricky. It's not a great format for this type of processing as you've seen. They are tricky to split and more complex to iterate through than simpler formats. However, others have implemented XMLInputFormat classes for Hadoop. Have you looked at these? Mahout has an XMLInputFormat implementation for example but I haven't used it directly. Anyway, you can reuse Hadoop InputFormat implementations in Flink directly. This is likely a good route. See Flink's HadoopInputFormat class. -Jamie On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <[hidden email]> wrote:
|
You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat definitions. See
http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <[hidden email]> wrote:
|
Jamie, Suneel thanks a lot, your replies have been very helpful. I will definitely take a look at XMLInputFormat.On 8 June 2016 at 04:23, Suneel Marthi <[hidden email]> wrote:
-- Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino GitHub: http://github.com/acisternino |
Hi, I am replying to myself for the records and to provide an update on what I am trying to do. I have looked into Mahout's XmlInputFormat class but unfortunately it doesn't solve my problem. My exploratory work with Flink tries to reproduce the key steps that we already perform in a quite large Apache Spark application that runs on Amazon EMR. For our use case the GPX files are not collections of independent records that could be split and analyzed in parallel. Instead, more than 95% of them are considered by our algorithms as a single record (a so called "Track"). IOW, we would not gain anything by splitting the files because in the vast majority of the cases we would get only one slice out of one file defeating the purpose of splitting them in the first place. GPX files have also another nasty property: they come in two versions (1.0 and 1.1, see more at http://www.topografix.com/gpx.asp.) Important attributes of a point (e.g. speed) are encoded very differently in the two versions and therefore the parsing logic must be different, at least for some sections of the file. To recognize the file version, the parser must look at the entire file because this information is available only in the namespace declaration of the root element. On top of all of this I think that, because of their small size and because we read all of them from S3, splitting within the file is not an issue. Can you confirm that? Going back to my WholeFileInputFormat class I am worried about setting the unsplittable attribute to true in the constructor. Will the constructor be invoked also when running in cluster? Well, I think i really need to setup a small Flink cluster and try it myself :) Thanks again. Andrea. On 8 June 2016 at 08:16, Andrea Cisternino <[hidden email]> wrote:
-- Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino GitHub: http://github.com/acisternino |
Hi, setting the unsplittable attribute in the constructor is fine. The field's value will be send to the cluster. So what happens is that you initialize the input format in your client program. Then, its serialized, send over the network to the machines and deserilaized again. So the value you've set in the ctor will end up in the cluster. On Fri, Jun 10, 2016 at 10:53 AM, Andrea Cisternino <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |