Maintaining data locality with list of paths (strings) as input

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Maintaining data locality with list of paths (strings) as input

Guy Rapaport
Hello,

Here's a use case I'd like to implement, and I wonder if Flink is the answer:

My input is a file containing a list of paths.
(It's actually a message queue with incoming messages, each containing a path, but let's use a simpler use-case.)

Each path points at a file stored on HDFS. The files are rather small so although they are replicated, they are not broken into chunks.

I want each file to get processed on the note on which it is stored, for the sake of data locality.
However, if I run such a job on Spark, what I get is that the input path gets to some node, which should access the file by pulling it from the HDFS - no data locality, but instead network congestion.

Can Flink solve this problem for me?

Note: I saw similar examples in which file lists are processed on Spark... By having each file in the list downloaded from the internet to the node processing it. That's not my use case - I already have the files on HDFS, all I want is to enjoy data locality in a cluster-like environment!

Thanks,
>Guy.
Reply | Threaded
Open this post in threaded view
|

RE: Maintaining data locality with list of paths (strings) as input

Emmanuel
Hi guy,

I don't have an answer about flink but a couple comments on your use case I hope might help:

- you should view HDFS as a giant RAID across nodes: the namenode maintains the file table but the data is distributed and replicated across nodes by bloc. There is no 'data locality' guarantee: the dat is distributed and replicated so it could be spread across many nodes.

- small files on HDFS is not a good idea because the typical minimal bloc size is 64MB, which means even if your file is 1kB, it will use 64MB on disk. It is best to aggregate those small files into a big one or use a dB storage like hbase or cassandra.

In Spark, you can load files from local file system, but usually it requires that files be in each node which defeats the purpose.


Emmanuel



-------- Original message --------
From: Guy Rapaport <[hidden email]>
Date:03/14/2015 8:38 AM (GMT-08:00)
To: Flink Users <[hidden email]>
Subject: Maintaining data locality with list of paths (strings) as input

Hello,

Here's a use case I'd like to implement, and I wonder if Flink is the answer:

My input is a file containing a list of paths.
(It's actually a message queue with incoming messages, each containing a path, but let's use a simpler use-case.)

Each path points at a file stored on HDFS. The files are rather small so although they are replicated, they are not broken into chunks.

I want each file to get processed on the note on which it is stored, for the sake of data locality.
However, if I run such a job on Spark, what I get is that the input path gets to some node, which should access the file by pulling it from the HDFS - no data locality, but instead network congestion.

Can Flink solve this problem for me?

Note: I saw similar examples in which file lists are processed on Spark... By having each file in the list downloaded from the internet to the node processing it. That's not my use case - I already have the files on HDFS, all I want is to enjoy data locality in a cluster-like environment!

Thanks,
>Guy.
Reply | Threaded
Open this post in threaded view
|

Re: Maintaining data locality with list of paths (strings) as input

Stephan Ewen
Hi Guy,

This sounds like a use case that should workwith Flink.

When it comes to input handling, Flink differs a bit from Spark. Flink creates a set of input tasks and a set of input splits. The splits are then on-the-fly assigned to the tasks. Each task may work on multiple input spits, which are pulled from the JobManager. The input localization also happens on the fly, as input tasks requests the next input split to process.

It should be possible to make sure this is executed in a data-local way, if the input splits expose the location properly.

If you share a bit more information about how your input format looks like, we can definitely help you to realize that.

As a side comment: Small files in HDFS do not take up 64 megabytes on disk, there must be some confusion. In early versions of HDFS, there has been a big problem with small files, because the Namenode was getting overloaded with metadata. This has gotten better, as far as I know, but I am not sure if they have fully solved it.

Greetings,
Stephan

On Sat, Mar 14, 2015 at 5:49 PM, Emmanuel <[hidden email]> wrote:
Hi guy,

I don't have an answer about flink but a couple comments on your use case I hope might help:

- you should view HDFS as a giant RAID across nodes: the namenode maintains the file table but the data is distributed and replicated across nodes by bloc. There is no 'data locality' guarantee: the dat is distributed and replicated so it could be spread across many nodes.

- small files on HDFS is not a good idea because the typical minimal bloc size is 64MB, which means even if your file is 1kB, it will use 64MB on disk. It is best to aggregate those small files into a big one or use a dB storage like hbase or cassandra.

In Spark, you can load files from local file system, but usually it requires that files be in each node which defeats the purpose.


Emmanuel



-------- Original message --------
From: Guy Rapaport <[hidden email]>
Date:03/14/2015 8:38 AM (GMT-08:00)
To: Flink Users <[hidden email]>
Subject: Maintaining data locality with list of paths (strings) as input

Hello,

Here's a use case I'd like to implement, and I wonder if Flink is the answer:

My input is a file containing a list of paths.
(It's actually a message queue with incoming messages, each containing a path, but let's use a simpler use-case.)

Each path points at a file stored on HDFS. The files are rather small so although they are replicated, they are not broken into chunks.

I want each file to get processed on the note on which it is stored, for the sake of data locality.
However, if I run such a job on Spark, what I get is that the input path gets to some node, which should access the file by pulling it from the HDFS - no data locality, but instead network congestion.

Can Flink solve this problem for me?

Note: I saw similar examples in which file lists are processed on Spark... By having each file in the list downloaded from the internet to the node processing it. That's not my use case - I already have the files on HDFS, all I want is to enjoy data locality in a cluster-like environment!

Thanks,
>Guy.

Reply | Threaded
Open this post in threaded view
|

RE: Maintaining data locality with list of paths (strings) as input

Emmanuel
I stand corrected: a file does not take an entire bloc, according to Hadoop: The Definitive Guide. As Stephan mentioned it takes space in the namenode (150kb/file as i read). So the limitation with many small files is with the capacity of the namenode.

Still the data, when replicated is distributed by the HDFS engine; I don't see how there can be any assurance of the locality of the data. it's possible to figure where it is physically located, but I'm not sure it can be predicted.

The whole idea behind Hadoop is to bring the computation to the data, and that's what YARN does in distributing processing tasks. (Yahoo Dev network explains this well: https://developer.yahoo.com/hadoop/tutorial/module1.html)

Is the Flink behavior mentioned native or is this something happening when running Flink on YARN?



Date: Sat, 14 Mar 2015 19:16:24 +0100
Subject: Re: Maintaining data locality with list of paths (strings) as input
From: [hidden email]
To: [hidden email]

Hi Guy,

This sounds like a use case that should workwith Flink.

When it comes to input handling, Flink differs a bit from Spark. Flink creates a set of input tasks and a set of input splits. The splits are then on-the-fly assigned to the tasks. Each task may work on multiple input spits, which are pulled from the JobManager. The input localization also happens on the fly, as input tasks requests the next input split to process.

It should be possible to make sure this is executed in a data-local way, if the input splits expose the location properly.

If you share a bit more information about how your input format looks like, we can definitely help you to realize that.

As a side comment: Small files in HDFS do not take up 64 megabytes on disk, there must be some confusion. In early versions of HDFS, there has been a big problem with small files, because the Namenode was getting overloaded with metadata. This has gotten better, as far as I know, but I am not sure if they have fully solved it.

Greetings,
Stephan

On Sat, Mar 14, 2015 at 5:49 PM, Emmanuel <[hidden email]> wrote:
Hi guy,

I don't have an answer about flink but a couple comments on your use case I hope might help:

- you should view HDFS as a giant RAID across nodes: the namenode maintains the file table but the data is distributed and replicated across nodes by bloc. There is no 'data locality' guarantee: the dat is distributed and replicated so it could be spread across many nodes.

- small files on HDFS is not a good idea because the typical minimal bloc size is 64MB, which means even if your file is 1kB, it will use 64MB on disk. It is best to aggregate those small files into a big one or use a dB storage like hbase or cassandra.

In Spark, you can load files from local file system, but usually it requires that files be in each node which defeats the purpose.


Emmanuel



-------- Original message --------
From: Guy Rapaport <[hidden email]>
Date:03/14/2015 8:38 AM (GMT-08:00)
To: Flink Users <[hidden email]>
Subject: Maintaining data locality with list of paths (strings) as input

Hello,

Here's a use case I'd like to implement, and I wonder if Flink is the answer:

My input is a file containing a list of paths.
(It's actually a message queue with incoming messages, each containing a path, but let's use a simpler use-case.)

Each path points at a file stored on HDFS. The files are rather small so although they are replicated, they are not broken into chunks.

I want each file to get processed on the note on which it is stored, for the sake of data locality.
However, if I run such a job on Spark, what I get is that the input path gets to some node, which should access the file by pulling it from the HDFS - no data locality, but instead network congestion.

Can Flink solve this problem for me?

Note: I saw similar examples in which file lists are processed on Spark... By having each file in the list downloaded from the internet to the node processing it. That's not my use case - I already have the files on HDFS, all I want is to enjoy data locality in a cluster-like environment!

Thanks,
>Guy.

Reply | Threaded
Open this post in threaded view
|

Re: Maintaining data locality with list of paths (strings) as input

Guy Rapaport
In reply to this post by Stephan Ewen
Hi Stephan,

The case is this: I have lots of images stored on a cluster, and I want to create a system in which I send a message (to a message queue: let's say Apache Kafka) and the message is accepted within the cluster and processed. The message contains the ID of one of the images (or even its full path) and the said image is processed.

That said, since I already have an operational Hadoop cluster working, I'd thought I'd use Hadoop for this job, with Spark Streaming reading from my message queue, and processing these incoming messages, handling images located in the HDFS. I wanted to enjoy data locality - i.e. since all of my nodes are both HDFS data nodes and YARN workers, to have the worker where the image is stored process it.

As I mentioned - the thing with Spark Streaming is that upon an incoming message, the accepted path is passed by value (as a string) to one of the workers as its task. The worker reads the path - but it might point at an image that is not stored on this worker at all, so it'll have to fetch it over the network.

Again - I'm not sure if Flink is the right tool in the Hadoop ecosystem, or even Hadoop itself in the big data ecosystem. I just need something that can ensure that my data gets processed locally, even if all I have as an input is a list of HDFS paths.

Thanks,
>g.


On Sat, Mar 14, 2015 at 8:16 PM, Stephan Ewen <[hidden email]> wrote:
Hi Guy,

This sounds like a use case that should workwith Flink.

When it comes to input handling, Flink differs a bit from Spark. Flink creates a set of input tasks and a set of input splits. The splits are then on-the-fly assigned to the tasks. Each task may work on multiple input spits, which are pulled from the JobManager. The input localization also happens on the fly, as input tasks requests the next input split to process.

It should be possible to make sure this is executed in a data-local way, if the input splits expose the location properly.

If you share a bit more information about how your input format looks like, we can definitely help you to realize that.

As a side comment: Small files in HDFS do not take up 64 megabytes on disk, there must be some confusion. In early versions of HDFS, there has been a big problem with small files, because the Namenode was getting overloaded with metadata. This has gotten better, as far as I know, but I am not sure if they have fully solved it.

Greetings,
Stephan

On Sat, Mar 14, 2015 at 5:49 PM, Emmanuel <[hidden email]> wrote:
Hi guy,

I don't have an answer about flink but a couple comments on your use case I hope might help:

- you should view HDFS as a giant RAID across nodes: the namenode maintains the file table but the data is distributed and replicated across nodes by bloc. There is no 'data locality' guarantee: the dat is distributed and replicated so it could be spread across many nodes.

- small files on HDFS is not a good idea because the typical minimal bloc size is 64MB, which means even if your file is 1kB, it will use 64MB on disk. It is best to aggregate those small files into a big one or use a dB storage like hbase or cassandra.

In Spark, you can load files from local file system, but usually it requires that files be in each node which defeats the purpose.


Emmanuel



-------- Original message --------
From: Guy Rapaport <[hidden email]>
Date:03/14/2015 8:38 AM (GMT-08:00)
To: Flink Users <[hidden email]>
Subject: Maintaining data locality with list of paths (strings) as input

Hello,

Here's a use case I'd like to implement, and I wonder if Flink is the answer:

My input is a file containing a list of paths.
(It's actually a message queue with incoming messages, each containing a path, but let's use a simpler use-case.)

Each path points at a file stored on HDFS. The files are rather small so although they are replicated, they are not broken into chunks.

I want each file to get processed on the note on which it is stored, for the sake of data locality.
However, if I run such a job on Spark, what I get is that the input path gets to some node, which should access the file by pulling it from the HDFS - no data locality, but instead network congestion.

Can Flink solve this problem for me?

Note: I saw similar examples in which file lists are processed on Spark... By having each file in the list downloaded from the internet to the node processing it. That's not my use case - I already have the files on HDFS, all I want is to enjoy data locality in a cluster-like environment!

Thanks,
>Guy.


Reply | Threaded
Open this post in threaded view
|

Re: Maintaining data locality with list of paths (strings) as input

rmetzger0
Hi,

@Emmanuel:
"Is the Flink behavior mentioned native or is this something happening when running Flink on YARN?"

The input split assignment behavior Stephan described is implemented into Flink, so it works in a stanalone Flink cluster and in a YARN setup.
In a setup where each machine running a HDFS DataNode also has a Flink TaskManager is running, almost all reads will happen locally.
The implementation of this is relatively easy: The HDFS client has a method to retrieve the hosts where the blocks of a file are located.

@Guy:
What you could do is a Flink topology like this:
(Kafka Source) --> (map url to host with local image) --> /broadcast to all workers/ --> process URL with worker.

The first mapper can just use the standard HDFS client to retrieve the location (=hostname) of the block. Then, you pack the <hostname, imagepath> into a Tuple2 and broadcast this to all the worker.
Only the worker that has the correct hostname is allowed to process the image.
This approach would issue an RPC call to the HDFS namenode for each incoming URL. Also, if the DataNode on a worker goes down, Flink would need to do a remote read again. But that case shouln't be too common.
A good optimization for this approach would be a custom partitioner to the hosts in the Streaming API.







On Sat, Mar 14, 2015 at 10:23 PM, Guy Rapaport <[hidden email]> wrote:
Hi Stephan,

The case is this: I have lots of images stored on a cluster, and I want to create a system in which I send a message (to a message queue: let's say Apache Kafka) and the message is accepted within the cluster and processed. The message contains the ID of one of the images (or even its full path) and the said image is processed.

That said, since I already have an operational Hadoop cluster working, I'd thought I'd use Hadoop for this job, with Spark Streaming reading from my message queue, and processing these incoming messages, handling images located in the HDFS. I wanted to enjoy data locality - i.e. since all of my nodes are both HDFS data nodes and YARN workers, to have the worker where the image is stored process it.

As I mentioned - the thing with Spark Streaming is that upon an incoming message, the accepted path is passed by value (as a string) to one of the workers as its task. The worker reads the path - but it might point at an image that is not stored on this worker at all, so it'll have to fetch it over the network.

Again - I'm not sure if Flink is the right tool in the Hadoop ecosystem, or even Hadoop itself in the big data ecosystem. I just need something that can ensure that my data gets processed locally, even if all I have as an input is a list of HDFS paths.

Thanks,
>g.


On Sat, Mar 14, 2015 at 8:16 PM, Stephan Ewen <[hidden email]> wrote:
Hi Guy,

This sounds like a use case that should workwith Flink.

When it comes to input handling, Flink differs a bit from Spark. Flink creates a set of input tasks and a set of input splits. The splits are then on-the-fly assigned to the tasks. Each task may work on multiple input spits, which are pulled from the JobManager. The input localization also happens on the fly, as input tasks requests the next input split to process.

It should be possible to make sure this is executed in a data-local way, if the input splits expose the location properly.

If you share a bit more information about how your input format looks like, we can definitely help you to realize that.

As a side comment: Small files in HDFS do not take up 64 megabytes on disk, there must be some confusion. In early versions of HDFS, there has been a big problem with small files, because the Namenode was getting overloaded with metadata. This has gotten better, as far as I know, but I am not sure if they have fully solved it.

Greetings,
Stephan

On Sat, Mar 14, 2015 at 5:49 PM, Emmanuel <[hidden email]> wrote:
Hi guy,

I don't have an answer about flink but a couple comments on your use case I hope might help:

- you should view HDFS as a giant RAID across nodes: the namenode maintains the file table but the data is distributed and replicated across nodes by bloc. There is no 'data locality' guarantee: the dat is distributed and replicated so it could be spread across many nodes.

- small files on HDFS is not a good idea because the typical minimal bloc size is 64MB, which means even if your file is 1kB, it will use 64MB on disk. It is best to aggregate those small files into a big one or use a dB storage like hbase or cassandra.

In Spark, you can load files from local file system, but usually it requires that files be in each node which defeats the purpose.


Emmanuel



-------- Original message --------
From: Guy Rapaport <[hidden email]>
Date:03/14/2015 8:38 AM (GMT-08:00)
To: Flink Users <[hidden email]>
Subject: Maintaining data locality with list of paths (strings) as input

Hello,

Here's a use case I'd like to implement, and I wonder if Flink is the answer:

My input is a file containing a list of paths.
(It's actually a message queue with incoming messages, each containing a path, but let's use a simpler use-case.)

Each path points at a file stored on HDFS. The files are rather small so although they are replicated, they are not broken into chunks.

I want each file to get processed on the note on which it is stored, for the sake of data locality.
However, if I run such a job on Spark, what I get is that the input path gets to some node, which should access the file by pulling it from the HDFS - no data locality, but instead network congestion.

Can Flink solve this problem for me?

Note: I saw similar examples in which file lists are processed on Spark... By having each file in the list downloaded from the internet to the node processing it. That's not my use case - I already have the files on HDFS, all I want is to enjoy data locality in a cluster-like environment!

Thanks,
>Guy.