Fwd: Flink loading an S3 File out of order

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Flink loading an S3 File out of order

Benjamin Kadish
I am trying to read a file from S3 in the correct order. It seems to be that Flink is downloading the file out of order, or at least its constructing the DataSet out of order. I 
tried using hadoop to download the file and it seemed to download it in order. 
I am able to reproduce the problem with the following line:
env.readTextFileWithValue(conf.options.get(S3FileName).get)
   .writeAsText(s"${conf.output}/output",writeMode = FileSystem.WriteMode.OVERWRITE)
The output looks something like

line 1001
line 1002
...
line 1304
line 1

Is there a way to guarantee order?

--
Benjamin Kadish
(260) 441-6159
Reply | Threaded
Open this post in threaded view
|

Re: Flink loading an S3 File out of order

Fabian Hueske-2
Hi Benjamin,

Flink reads data usually in parallel. This is done by splitting the input (e.g., a file) into several input splits. Each input split is independently processed. Since splits are usually concurrently processed by more than one task, Flink does not care about the order by default.

You can implement a special InputFormat that uses a custom InputSplitAssigner to ensure that splits are handed out in order.
This would requires a bit of coding though.

A DataSet is usually distributed among multiple partitions/tasks and does also not have the concept (complete) order. It is possible to sort the data of a data set in each individual partition by calling DataSet.sortPartition(key, order). If you do that with a parallelism of one (DataSet.sortPartition().setParallelism(1)), you'll have a fully ordered data set, however only on one machine.
Flink does also support range partitioning (DataSet.partitionByRange()) in case you want to sort the data in parallel.

Best, Fabian

2016-03-10 16:52 GMT+01:00 Benjamin Kadish <[hidden email]>:
I am trying to read a file from S3 in the correct order. It seems to be that Flink is downloading the file out of order, or at least its constructing the DataSet out of order. I 
tried using hadoop to download the file and it seemed to download it in order. 
I am able to reproduce the problem with the following line:
env.readTextFileWithValue(conf.options.get(S3FileName).get)
   .writeAsText(s"${conf.output}/output",writeMode = FileSystem.WriteMode.OVERWRITE)
The output looks something like

line 1001
line 1002
...
line 1304
line 1

Is there a way to guarantee order?

--
Benjamin Kadish
<a href="tel:%28260%29%20441-6159" value="+12604416159" target="_blank">(260) 441-6159