Hi everyone,
I'm using Flink and/or Hadoop on my cluster, and I'm having them generate log data in each worker node's /local folder (regular mount point). Now I would like to process these files using Flink, but I'm not quite sure how I could tell Flink to use each worker node's /local folder as input path, because I'd expect Flink to look in the /local folder of the submitting node only. Do I have to put these files into HDFS or is there a way to tell Flink the file:///local file URI refers to worker-local data? Thanks in advance for any hints and best Robert My GPG Key ID: 336E2680 |
Hi Robert, 1) you have to ensure that on each worker at least one DataSource task is scheduled. The easiest way to do this is to have a bare metal setup (no YARN) and a single TaskManager per worker. Each TM should have the same number of slots and the DataSource should have a parallelism of #TMs * slots. This will ensure that the same number of DataSource tasks is started on each worker. 3) you need to tweak the input split assignment. You need to provide a custom input split provider that ensures that splits are only assigned to the correct task manager. Otherwise it might happen that a TaskManager processes the split of another TM and some data is read twice while other data is not read at all. 4) once a split is assigned to a task the InputFormat.open() method is called. Based on the local index, the task should decide which files (or parts of files) it needs to read. This decision must be deterministic (only depend on local index) and ensure that all data (files / parts of files) are read exactly once (you'll need the number of slots per host for that). You see, this is not trivial. Moreover, such a setup is not flexible, quite fragile, and not fault tolerant. Since you need to read local files are not available anywhere else, your job will fail if a TM goes down. If possible, I would recommend to move the data into a distributed file system. Best, Fabian 2016-12-27 13:04 GMT+01:00 Robert Schmidtke <[hidden email]>:
|
Hi Fabian, thanks for your directions! They worked flawlessly. I am aware of the reduced robustness, but then again my input is only available on each worker and not replicated. In case anyone is wondering, here is how I did it: Thanks again! Robert On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <[hidden email]> wrote:
My GPG Key ID: 336E2680 On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
Free forum by Nabble | Edit this page |