Reading worker-local input files

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading worker-local input files

Robert Schmidtke
Hi everyone,

I'm using Flink and/or Hadoop on my cluster, and I'm having them generate log data in each worker node's /local folder (regular mount point). Now I would like to process these files using Flink, but I'm not quite sure how I could tell Flink to use each worker node's /local folder as input path, because I'd expect Flink to look in the /local folder of the submitting node only. Do I have to put these files into HDFS or is there a way to tell Flink the file:///local file URI refers to worker-local data? Thanks in advance for any hints and best

Robert

--
My GPG Key ID: 336E2680
Reply | Threaded
Open this post in threaded view
|

Re: Reading worker-local input files

Fabian Hueske-2

Hi Robert,

this is indeed a bit tricky to do. The problem is mostly with the generation of the input splits, setup of Flink, and the scheduling of tasks.

1) you have to ensure that on each worker at least one DataSource task is scheduled. The easiest way to do this is to have a bare metal setup (no YARN) and a single TaskManager per worker. Each TM should have the same number of slots and the DataSource should have a parallelism of #TMs * slots. This will ensure that the same number of DataSource tasks is started on each worker.

2) you need to tweak the input split generation. Flink's FileInputFormat assume that it can access all files to be read via a distributed file system. Your InputFormat should have a parameter for the list of taskmanager (hostnames, IP addresses) and the number of slots per TM. The InputFormat.createInputSplits() should create one input split for each parallel task. Each split should have (hostname, local index)

3) you need to tweak the input split assignment. You need to provide a custom input split provider that ensures that splits are only assigned to the correct task manager. Otherwise it might happen that a TaskManager processes the split of another TM and some data is read twice while other data is not read at all.

4) once a split is assigned to a task the InputFormat.open() method is called. Based on the local index, the task should decide which files (or parts of files) it needs to read. This decision must be deterministic (only depend on local index) and ensure that all data (files / parts of files) are read exactly once (you'll need the number of slots per host for that).

You see, this is not trivial. Moreover, such a setup is not flexible, quite fragile, and not fault tolerant.
Since you need to read local files are not available anywhere else, your job will fail if a TM goes down.

If possible, I would recommend to move the data into a distributed file system.

Best,
Fabian

2016-12-27 13:04 GMT+01:00 Robert Schmidtke <[hidden email]>:
Hi everyone,

I'm using Flink and/or Hadoop on my cluster, and I'm having them generate log data in each worker node's /local folder (regular mount point). Now I would like to process these files using Flink, but I'm not quite sure how I could tell Flink to use each worker node's /local folder as input path, because I'd expect Flink to look in the /local folder of the submitting node only. Do I have to put these files into HDFS or is there a way to tell Flink the file:///local file URI refers to worker-local data? Thanks in advance for any hints and best

Robert

--
My GPG Key ID: 336E2680

Reply | Threaded
Open this post in threaded view
|

Re: Reading worker-local input files

Robert Schmidtke
Hi Fabian,

thanks for your directions! They worked flawlessly. I am aware of the reduced robustness, but then again my input is only available on each worker and not replicated. In case anyone is wondering, here is how I did it:

Thanks again!
Robert

On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <[hidden email]> wrote:

Hi Robert,

this is indeed a bit tricky to do. The problem is mostly with the generation of the input splits, setup of Flink, and the scheduling of tasks.

1) you have to ensure that on each worker at least one DataSource task is scheduled. The easiest way to do this is to have a bare metal setup (no YARN) and a single TaskManager per worker. Each TM should have the same number of slots and the DataSource should have a parallelism of #TMs * slots. This will ensure that the same number of DataSource tasks is started on each worker.

2) you need to tweak the input split generation. Flink's FileInputFormat assume that it can access all files to be read via a distributed file system. Your InputFormat should have a parameter for the list of taskmanager (hostnames, IP addresses) and the number of slots per TM. The InputFormat.createInputSplits() should create one input split for each parallel task. Each split should have (hostname, local index)

3) you need to tweak the input split assignment. You need to provide a custom input split provider that ensures that splits are only assigned to the correct task manager. Otherwise it might happen that a TaskManager processes the split of another TM and some data is read twice while other data is not read at all.

4) once a split is assigned to a task the InputFormat.open() method is called. Based on the local index, the task should decide which files (or parts of files) it needs to read. This decision must be deterministic (only depend on local index) and ensure that all data (files / parts of files) are read exactly once (you'll need the number of slots per host for that).

You see, this is not trivial. Moreover, such a setup is not flexible, quite fragile, and not fault tolerant.
Since you need to read local files are not available anywhere else, your job will fail if a TM goes down.

If possible, I would recommend to move the data into a distributed file system.

Best,
Fabian

2016-12-27 13:04 GMT+01:00 Robert Schmidtke <[hidden email]>:
Hi everyone,

I'm using Flink and/or Hadoop on my cluster, and I'm having them generate log data in each worker node's /local folder (regular mount point). Now I would like to process these files using Flink, but I'm not quite sure how I could tell Flink to use each worker node's /local folder as input path, because I'd expect Flink to look in the /local folder of the submitting node only. Do I have to put these files into HDFS or is there a way to tell Flink the file:///local file URI refers to worker-local data? Thanks in advance for any hints and best

Robert

--
My GPG Key ID: 336E2680




--
My GPG Key ID: 336E2680

On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <[hidden email]> wrote:

Hi Robert,

this is indeed a bit tricky to do. The problem is mostly with the generation of the input splits, setup of Flink, and the scheduling of tasks.

1) you have to ensure that on each worker at least one DataSource task is scheduled. The easiest way to do this is to have a bare metal setup (no YARN) and a single TaskManager per worker. Each TM should have the same number of slots and the DataSource should have a parallelism of #TMs * slots. This will ensure that the same number of DataSource tasks is started on each worker.

2) you need to tweak the input split generation. Flink's FileInputFormat assume that it can access all files to be read via a distributed file system. Your InputFormat should have a parameter for the list of taskmanager (hostnames, IP addresses) and the number of slots per TM. The InputFormat.createInputSplits() should create one input split for each parallel task. Each split should have (hostname, local index)

3) you need to tweak the input split assignment. You need to provide a custom input split provider that ensures that splits are only assigned to the correct task manager. Otherwise it might happen that a TaskManager processes the split of another TM and some data is read twice while other data is not read at all.

4) once a split is assigned to a task the InputFormat.open() method is called. Based on the local index, the task should decide which files (or parts of files) it needs to read. This decision must be deterministic (only depend on local index) and ensure that all data (files / parts of files) are read exactly once (you'll need the number of slots per host for that).

You see, this is not trivial. Moreover, such a setup is not flexible, quite fragile, and not fault tolerant.
Since you need to read local files are not available anywhere else, your job will fail if a TM goes down.

If possible, I would recommend to move the data into a distributed file system.

Best,
Fabian

2016-12-27 13:04 GMT+01:00 Robert Schmidtke <[hidden email]>:
Hi everyone,

I'm using Flink and/or Hadoop on my cluster, and I'm having them generate log data in each worker node's /local folder (regular mount point). Now I would like to process these files using Flink, but I'm not quite sure how I could tell Flink to use each worker node's /local folder as input path, because I'd expect Flink to look in the /local folder of the submitting node only. Do I have to put these files into HDFS or is there a way to tell Flink the file:///local file URI refers to worker-local data? Thanks in advance for any hints and best

Robert

--
My GPG Key ID: 336E2680




--
My GPG Key ID: 336E2680