Reading whole files (from S3)

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading whole files (from S3)

Andrea Cisternino
Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by line basis.
One GPX file will produce one or more Java objects that will contain the geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat<String>
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
    unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
    super.open(fileSplit);
    hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
    // uses apache.commons.io.IOUtils
    String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
    hasReachedEnd = true;
    return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return hasReachedEnd;
  }
}


This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

--
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino
Reply | Threaded
Open this post in threaded view
|

Re: Reading whole files (from S3)

Jamie Grier
Hi Andrea,

How large are these data files?  The implementation you've mentioned here is only usable if they are very small.  If so, you're fine.  If not read on...

Processing XML input files in parallel is tricky.  It's not a great format for this type of processing as you've seen.  They are tricky to split and more complex to iterate through than simpler formats. However, others have implemented XMLInputFormat classes for Hadoop.  Have you looked at these?  Mahout has an XMLInputFormat implementation for example but I haven't used it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink directly.  This is likely a good route.  See Flink's HadoopInputFormat class.  

-Jamie
  

On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <[hidden email]> wrote:
Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by line basis.
One GPX file will produce one or more Java objects that will contain the geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat<String>
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
    unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
    super.open(fileSplit);
    hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
    // uses apache.commons.io.IOUtils
    String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
    hasReachedEnd = true;
    return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return hasReachedEnd;
  }
}


This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

--
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino



--

Jamie Grier
data Artisans, Director of Applications Engineering

Reply | Threaded
Open this post in threaded view
|

Re: Reading whole files (from S3)

Suneel Marthi
You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat definitions. See 

http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink


On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <[hidden email]> wrote:
Hi Andrea,

How large are these data files?  The implementation you've mentioned here is only usable if they are very small.  If so, you're fine.  If not read on...

Processing XML input files in parallel is tricky.  It's not a great format for this type of processing as you've seen.  They are tricky to split and more complex to iterate through than simpler formats. However, others have implemented XMLInputFormat classes for Hadoop.  Have you looked at these?  Mahout has an XMLInputFormat implementation for example but I haven't used it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink directly.  This is likely a good route.  See Flink's HadoopInputFormat class.  

-Jamie
  

On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <[hidden email]> wrote:
Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by line basis.
One GPX file will produce one or more Java objects that will contain the geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat<String>
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
    unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
    super.open(fileSplit);
    hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
    // uses apache.commons.io.IOUtils
    String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
    hasReachedEnd = true;
    return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return hasReachedEnd;
  }
}


This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

--
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino



--

Jamie Grier
data Artisans, Director of Applications Engineering


Reply | Threaded
Open this post in threaded view
|

Re: Reading whole files (from S3)

Andrea Cisternino
Jamie, Suneel thanks a lot, your replies have been very helpful.

I will definitely take a look at XMLInputFormat.

In any case the files are not very big: on average 100-200kB up to a max of a couple of MB.


On 8 June 2016 at 04:23, Suneel Marthi <[hidden email]> wrote:
You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat definitions. See 

http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink


On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <[hidden email]> wrote:
Hi Andrea,

How large are these data files?  The implementation you've mentioned here is only usable if they are very small.  If so, you're fine.  If not read on...

Processing XML input files in parallel is tricky.  It's not a great format for this type of processing as you've seen.  They are tricky to split and more complex to iterate through than simpler formats. However, others have implemented XMLInputFormat classes for Hadoop.  Have you looked at these?  Mahout has an XMLInputFormat implementation for example but I haven't used it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink directly.  This is likely a good route.  See Flink's HadoopInputFormat class.  

-Jamie
  

On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <[hidden email]> wrote:
Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by line basis.
One GPX file will produce one or more Java objects that will contain the geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat<String>
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
    unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
    super.open(fileSplit);
    hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
    // uses apache.commons.io.IOUtils
    String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
    hasReachedEnd = true;
    return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return hasReachedEnd;
  }
}


This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

--
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino



--

Jamie Grier
data Artisans, Director of Applications Engineering





--
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino
Reply | Threaded
Open this post in threaded view
|

Re: Reading whole files (from S3)

Andrea Cisternino
Hi,

I am replying to myself for the records and to provide an update on what I am trying to do.

I have looked into Mahout's XmlInputFormat class but unfortunately it doesn't solve my problem.

My exploratory work with Flink tries to reproduce the key steps that we already perform in a quite large Apache Spark application that runs on Amazon EMR.

For our use case the GPX files are not collections of independent records that could be split and analyzed in parallel. Instead, more than 95% of them are considered by our algorithms as a single record (a so called "Track").

IOW, we would not gain anything by splitting the files because in the vast majority of the cases we would get only one slice out of one file defeating the purpose of splitting them in the first place.

GPX files have also another nasty property: they come in two versions (1.0 and 1.1, see more at http://www.topografix.com/gpx.asp.)
Important attributes of a point (e.g. speed) are encoded very differently in the two versions and therefore the parsing logic must be different, at least for some sections of the file.

To recognize the file version, the parser must look at the entire file because this information is available only in the namespace declaration of the root element.

On top of all of this I think that, because of their small size and because we read all of them from S3, splitting within the file is not an issue. Can you confirm that?

Going back to my WholeFileInputFormat class I am worried about setting the unsplittable attribute to true in the constructor. Will the constructor be invoked also when running in cluster?

Well, I think i really need to setup a small Flink cluster and try it myself :)

Thanks again.
  Andrea.

On 8 June 2016 at 08:16, Andrea Cisternino <[hidden email]> wrote:
Jamie, Suneel thanks a lot, your replies have been very helpful.

I will definitely take a look at XMLInputFormat.

In any case the files are not very big: on average 100-200kB up to a max of a couple of MB.


On 8 June 2016 at 04:23, Suneel Marthi <[hidden email]> wrote:
You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat definitions. See 

http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink


On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <[hidden email]> wrote:
Hi Andrea,

How large are these data files?  The implementation you've mentioned here is only usable if they are very small.  If so, you're fine.  If not read on...

Processing XML input files in parallel is tricky.  It's not a great format for this type of processing as you've seen.  They are tricky to split and more complex to iterate through than simpler formats. However, others have implemented XMLInputFormat classes for Hadoop.  Have you looked at these?  Mahout has an XMLInputFormat implementation for example but I haven't used it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink directly.  This is likely a good route.  See Flink's HadoopInputFormat class.  

-Jamie
  

On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <[hidden email]> wrote:
Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by line basis.
One GPX file will produce one or more Java objects that will contain the geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat<String>
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
    unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
    super.open(fileSplit);
    hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
    // uses apache.commons.io.IOUtils
    String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
    hasReachedEnd = true;
    return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return hasReachedEnd;
  }
}


This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

--
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino



--

Jamie Grier
data Artisans, Director of Applications Engineering





--
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino



--
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino
Reply | Threaded
Open this post in threaded view
|

Re: Reading whole files (from S3)

rmetzger0
Hi,
setting the unsplittable attribute in the constructor is fine. The field's value will be send to the cluster.
So what happens is that you initialize the input format in your client program. Then, its serialized, send over the network to the machines and deserilaized again. So the value you've set in the ctor will end up in the cluster.

On Fri, Jun 10, 2016 at 10:53 AM, Andrea Cisternino <[hidden email]> wrote:
Hi,

I am replying to myself for the records and to provide an update on what I am trying to do.

I have looked into Mahout's XmlInputFormat class but unfortunately it doesn't solve my problem.

My exploratory work with Flink tries to reproduce the key steps that we already perform in a quite large Apache Spark application that runs on Amazon EMR.

For our use case the GPX files are not collections of independent records that could be split and analyzed in parallel. Instead, more than 95% of them are considered by our algorithms as a single record (a so called "Track").

IOW, we would not gain anything by splitting the files because in the vast majority of the cases we would get only one slice out of one file defeating the purpose of splitting them in the first place.

GPX files have also another nasty property: they come in two versions (1.0 and 1.1, see more at http://www.topografix.com/gpx.asp.)
Important attributes of a point (e.g. speed) are encoded very differently in the two versions and therefore the parsing logic must be different, at least for some sections of the file.

To recognize the file version, the parser must look at the entire file because this information is available only in the namespace declaration of the root element.

On top of all of this I think that, because of their small size and because we read all of them from S3, splitting within the file is not an issue. Can you confirm that?

Going back to my WholeFileInputFormat class I am worried about setting the unsplittable attribute to true in the constructor. Will the constructor be invoked also when running in cluster?

Well, I think i really need to setup a small Flink cluster and try it myself :)

Thanks again.
  Andrea.

On 8 June 2016 at 08:16, Andrea Cisternino <[hidden email]> wrote:
Jamie, Suneel thanks a lot, your replies have been very helpful.

I will definitely take a look at XMLInputFormat.

In any case the files are not very big: on average 100-200kB up to a max of a couple of MB.


On 8 June 2016 at 04:23, Suneel Marthi <[hidden email]> wrote:
You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat definitions. See 

http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink


On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <[hidden email]> wrote:
Hi Andrea,

How large are these data files?  The implementation you've mentioned here is only usable if they are very small.  If so, you're fine.  If not read on...

Processing XML input files in parallel is tricky.  It's not a great format for this type of processing as you've seen.  They are tricky to split and more complex to iterate through than simpler formats. However, others have implemented XMLInputFormat classes for Hadoop.  Have you looked at these?  Mahout has an XMLInputFormat implementation for example but I haven't used it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink directly.  This is likely a good route.  See Flink's HadoopInputFormat class.  

-Jamie
  

On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <[hidden email]> wrote:
Hi all,

I am evaluating Apache Flink for processing large sets of Geospatial data.
The use case I am working on will involve reading a certain number of GPX files stored on Amazon S3.

GPX files are actually XML files and therefore cannot be read on a line by line basis.
One GPX file will produce one or more Java objects that will contain the geospatial data we need to process (mostly a list of geographical points).

To cover this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat extends FileInputFormat<String>
{
  private boolean hasReachedEnd = false;

  public WholeFileInputFormat() {
    unsplittable = true;
  }

  @Override
  public void open(FileInputSplit fileSplit) throws IOException {
    super.open(fileSplit);
    hasReachedEnd = false;
  }

  @Override
  public String nextRecord(String reuse) throws IOException {
    // uses apache.commons.io.IOUtils
    String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
    hasReachedEnd = true;
    return fileContent;
  }

  @Override
  public boolean reachedEnd() throws IOException {
    return hasReachedEnd;
  }
}


This class returns the content of the whole file as a string.

Is this the right approach?
It seems to work when run locally with local files but I wonder if it would
run into problems when tested in a cluster.

Thanks in advance.
  Andrea.

--
Andrea Cisternino, Erlangen, Germany
GitHub: http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino



--

Jamie Grier
data Artisans, Director of Applications Engineering





--
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino



--
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino