Reading separate files in parallel tasks as input

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading separate files in parallel tasks as input

Dániel Bali
Hello!

We are running an experiment on a cluster and we have a large input split into multiple files. We'd like to run a Flink job that reads the local file on each instance and processes that. Is there a way to do this in the batch environment? `readTextFile` wants to read the file on the JobManager and split that right there, which is not what we want.

We solved it in the streaming environment by using `addSource`, but there is no similar function in the batch version. Does anybody know how this could be done?

Thanks!
Daniel
Reply | Threaded
Open this post in threaded view
|

Re: Reading separate files in parallel tasks as input

Márton Balassi
Hi Dani,

The batch API does not expose an addSourse-like method, but you can always write your own inputformat and pass that directly to constructor of the DataSource. DataSource extends DataSet, so you will get all the usual methods in the end. For an example you can have a look e.g. here. [1]


Best,

Marton

On Sun, Jun 14, 2015 at 4:34 PM, Dániel Bali <[hidden email]> wrote:
Hello!

We are running an experiment on a cluster and we have a large input split into multiple files. We'd like to run a Flink job that reads the local file on each instance and processes that. Is there a way to do this in the batch environment? `readTextFile` wants to read the file on the JobManager and split that right there, which is not what we want.

We solved it in the streaming environment by using `addSource`, but there is no similar function in the batch version. Does anybody know how this could be done?

Thanks!
Daniel

Reply | Threaded
Open this post in threaded view
|

Re: Reading separate files in parallel tasks as input

Dániel Bali
Hi Márton,

Thanks for the reply! I suppose I have to implement `createInputSplits` too then. I tried looking at the documentation for the InputFormat interface, but I can't see how I could force it to load separate files on separate task managers, instead of one file on the job manager. Where is this behavior decided? Or am I misunderstanding something about how this all works?

Cheers,
Daniel

On Sun, Jun 14, 2015 at 7:02 PM, Márton Balassi <[hidden email]> wrote:
Hi Dani,

The batch API does not expose an addSourse-like method, but you can always write your own inputformat and pass that directly to constructor of the DataSource. DataSource extends DataSet, so you will get all the usual methods in the end. For an example you can have a look e.g. here. [1]


Best,

Marton

On Sun, Jun 14, 2015 at 4:34 PM, Dániel Bali <[hidden email]> wrote:
Hello!

We are running an experiment on a cluster and we have a large input split into multiple files. We'd like to run a Flink job that reads the local file on each instance and processes that. Is there a way to do this in the batch environment? `readTextFile` wants to read the file on the JobManager and split that right there, which is not what we want.

We solved it in the streaming environment by using `addSource`, but there is no similar function in the batch version. Does anybody know how this could be done?

Thanks!
Daniel


Reply | Threaded
Open this post in threaded view
|

Re: Reading separate files in parallel tasks as input

rmetzger0
Hi Daniel,

Are the files in HDFS?
what do you exactly mean by "`readTextFile` wants to read the file on the JobManager" ?
The JobManager is not reading input files.
Also, Flink is assigning input splits locally (when reading from distributed file systems). In the JobManager log you can see how many splits are assigned locally and how many do remote reads. Usually the number of remote reads is very low.



On Sun, Jun 14, 2015 at 11:18 AM, Dániel Bali <[hidden email]> wrote:
Hi Márton,

Thanks for the reply! I suppose I have to implement `createInputSplits` too then. I tried looking at the documentation for the InputFormat interface, but I can't see how I could force it to load separate files on separate task managers, instead of one file on the job manager. Where is this behavior decided? Or am I misunderstanding something about how this all works?

Cheers,
Daniel

On Sun, Jun 14, 2015 at 7:02 PM, Márton Balassi <[hidden email]> wrote:
Hi Dani,

The batch API does not expose an addSourse-like method, but you can always write your own inputformat and pass that directly to constructor of the DataSource. DataSource extends DataSet, so you will get all the usual methods in the end. For an example you can have a look e.g. here. [1]


Best,

Marton

On Sun, Jun 14, 2015 at 4:34 PM, Dániel Bali <[hidden email]> wrote:
Hello!

We are running an experiment on a cluster and we have a large input split into multiple files. We'd like to run a Flink job that reads the local file on each instance and processes that. Is there a way to do this in the batch environment? `readTextFile` wants to read the file on the JobManager and split that right there, which is not what we want.

We solved it in the streaming environment by using `addSource`, but there is no similar function in the batch version. Does anybody know how this could be done?

Thanks!
Daniel



Reply | Threaded
Open this post in threaded view
|

Re: Reading separate files in parallel tasks as input

Dániel Bali
Hi Robert,

We are not using HDFS. We have a large file that's already split into 8 parts, each of them on a node that runs a separate task manager, at the same place, with the same name. The job manager is in another node. If I start a job that uses readTextFile, I get an exception, saying that the input file was not found, and the splits could not be created. (The exception disappears if I create an empty file with the given name on the job manager)

What I'd like is to read a different file on each node, and process that. Is there a way to do this?

Thanks,
Daniel

On Sun, Jun 14, 2015 at 8:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Daniel,

Are the files in HDFS?
what do you exactly mean by "`readTextFile` wants to read the file on the JobManager" ?
The JobManager is not reading input files.
Also, Flink is assigning input splits locally (when reading from distributed file systems). In the JobManager log you can see how many splits are assigned locally and how many do remote reads. Usually the number of remote reads is very low.



On Sun, Jun 14, 2015 at 11:18 AM, Dániel Bali <[hidden email]> wrote:
Hi Márton,

Thanks for the reply! I suppose I have to implement `createInputSplits` too then. I tried looking at the documentation for the InputFormat interface, but I can't see how I could force it to load separate files on separate task managers, instead of one file on the job manager. Where is this behavior decided? Or am I misunderstanding something about how this all works?

Cheers,
Daniel

On Sun, Jun 14, 2015 at 7:02 PM, Márton Balassi <[hidden email]> wrote:
Hi Dani,

The batch API does not expose an addSourse-like method, but you can always write your own inputformat and pass that directly to constructor of the DataSource. DataSource extends DataSet, so you will get all the usual methods in the end. For an example you can have a look e.g. here. [1]


Best,

Marton

On Sun, Jun 14, 2015 at 4:34 PM, Dániel Bali <[hidden email]> wrote:
Hello!

We are running an experiment on a cluster and we have a large input split into multiple files. We'd like to run a Flink job that reads the local file on each instance and processes that. Is there a way to do this in the batch environment? `readTextFile` wants to read the file on the JobManager and split that right there, which is not what we want.

We solved it in the streaming environment by using `addSource`, but there is no similar function in the batch version. Does anybody know how this could be done?

Thanks!
Daniel




Reply | Threaded
Open this post in threaded view
|

Re: Reading separate files in parallel tasks as input

Fabian Hueske-2
Hi,

reading local files in a distributed setting is a tricky thing because Flink assumes that all InputSplits can be read from all TaskManagers. This is obviously not possible if files are located on the local file systems different physical machines. Hence, you cannot use one of the provided FileInputFormats to read local files in a distributed setup.

Instead, you need to implement your own InputFormat which can extend any of the other file-based IFs. In addition, your IF needs to implement the StrictlyLocalAssignment interface and you need to overwrite the createInputSplits methods. Your IF must create FileInputSplits and each split must be local on exactly one host. You might need to overwrite the getStatistics method as well to avoid the empty-file-on-JobManager problem.

Internally, the JobManager will make sure that that InputSplits are only assigned to a TM that runs on "their" host.

Let me know, if you have further questions.
Best, Fabian

2015-06-14 20:37 GMT+02:00 Dániel Bali <[hidden email]>:
Hi Robert,

We are not using HDFS. We have a large file that's already split into 8 parts, each of them on a node that runs a separate task manager, at the same place, with the same name. The job manager is in another node. If I start a job that uses readTextFile, I get an exception, saying that the input file was not found, and the splits could not be created. (The exception disappears if I create an empty file with the given name on the job manager)

What I'd like is to read a different file on each node, and process that. Is there a way to do this?

Thanks,
Daniel

On Sun, Jun 14, 2015 at 8:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Daniel,

Are the files in HDFS?
what do you exactly mean by "`readTextFile` wants to read the file on the JobManager" ?
The JobManager is not reading input files.
Also, Flink is assigning input splits locally (when reading from distributed file systems). In the JobManager log you can see how many splits are assigned locally and how many do remote reads. Usually the number of remote reads is very low.



On Sun, Jun 14, 2015 at 11:18 AM, Dániel Bali <[hidden email]> wrote:
Hi Márton,

Thanks for the reply! I suppose I have to implement `createInputSplits` too then. I tried looking at the documentation for the InputFormat interface, but I can't see how I could force it to load separate files on separate task managers, instead of one file on the job manager. Where is this behavior decided? Or am I misunderstanding something about how this all works?

Cheers,
Daniel

On Sun, Jun 14, 2015 at 7:02 PM, Márton Balassi <[hidden email]> wrote:
Hi Dani,

The batch API does not expose an addSourse-like method, but you can always write your own inputformat and pass that directly to constructor of the DataSource. DataSource extends DataSet, so you will get all the usual methods in the end. For an example you can have a look e.g. here. [1]


Best,

Marton

On Sun, Jun 14, 2015 at 4:34 PM, Dániel Bali <[hidden email]> wrote:
Hello!

We are running an experiment on a cluster and we have a large input split into multiple files. We'd like to run a Flink job that reads the local file on each instance and processes that. Is there a way to do this in the batch environment? `readTextFile` wants to read the file on the JobManager and split that right there, which is not what we want.

We solved it in the streaming environment by using `addSource`, but there is no similar function in the batch version. Does anybody know how this could be done?

Thanks!
Daniel





Reply | Threaded
Open this post in threaded view
|

Re: Reading separate files in parallel tasks as input

Stephan Ewen
It sounds like what you want to do could also be done in the same way as "addSource()" with a GenericInputFormat.

It would not look like a FileInputFormat to Flink, and the JobManager would assign one generic meaningless split to each parallel instance.

Inside the input format, you could still internally open local file streams.

Greetings,
Stephan


On Sun, Jun 14, 2015 at 9:22 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

reading local files in a distributed setting is a tricky thing because Flink assumes that all InputSplits can be read from all TaskManagers. This is obviously not possible if files are located on the local file systems different physical machines. Hence, you cannot use one of the provided FileInputFormats to read local files in a distributed setup.

Instead, you need to implement your own InputFormat which can extend any of the other file-based IFs. In addition, your IF needs to implement the StrictlyLocalAssignment interface and you need to overwrite the createInputSplits methods. Your IF must create FileInputSplits and each split must be local on exactly one host. You might need to overwrite the getStatistics method as well to avoid the empty-file-on-JobManager problem.

Internally, the JobManager will make sure that that InputSplits are only assigned to a TM that runs on "their" host.

Let me know, if you have further questions.
Best, Fabian

2015-06-14 20:37 GMT+02:00 Dániel Bali <[hidden email]>:
Hi Robert,

We are not using HDFS. We have a large file that's already split into 8 parts, each of them on a node that runs a separate task manager, at the same place, with the same name. The job manager is in another node. If I start a job that uses readTextFile, I get an exception, saying that the input file was not found, and the splits could not be created. (The exception disappears if I create an empty file with the given name on the job manager)

What I'd like is to read a different file on each node, and process that. Is there a way to do this?

Thanks,
Daniel

On Sun, Jun 14, 2015 at 8:31 PM, Robert Metzger <[hidden email]> wrote:
Hi Daniel,

Are the files in HDFS?
what do you exactly mean by "`readTextFile` wants to read the file on the JobManager" ?
The JobManager is not reading input files.
Also, Flink is assigning input splits locally (when reading from distributed file systems). In the JobManager log you can see how many splits are assigned locally and how many do remote reads. Usually the number of remote reads is very low.



On Sun, Jun 14, 2015 at 11:18 AM, Dániel Bali <[hidden email]> wrote:
Hi Márton,

Thanks for the reply! I suppose I have to implement `createInputSplits` too then. I tried looking at the documentation for the InputFormat interface, but I can't see how I could force it to load separate files on separate task managers, instead of one file on the job manager. Where is this behavior decided? Or am I misunderstanding something about how this all works?

Cheers,
Daniel

On Sun, Jun 14, 2015 at 7:02 PM, Márton Balassi <[hidden email]> wrote:
Hi Dani,

The batch API does not expose an addSourse-like method, but you can always write your own inputformat and pass that directly to constructor of the DataSource. DataSource extends DataSet, so you will get all the usual methods in the end. For an example you can have a look e.g. here. [1]


Best,

Marton

On Sun, Jun 14, 2015 at 4:34 PM, Dániel Bali <[hidden email]> wrote:
Hello!

We are running an experiment on a cluster and we have a large input split into multiple files. We'd like to run a Flink job that reads the local file on each instance and processes that. Is there a way to do this in the batch environment? `readTextFile` wants to read the file on the JobManager and split that right there, which is not what we want.

We solved it in the streaming environment by using `addSource`, but there is no similar function in the batch version. Does anybody know how this could be done?

Thanks!
Daniel