Fwd: Processing Sorted Input Datasets

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Processing Sorted Input Datasets

Helmut Zechmann
Hi all,
Helmut Zechmann
0151 27527950




we want to use flink batch to merge records from two or more datasets using groupBy.
The input datasets are already sorted since they have been written out sorted by some other job.

Is it possible to tell flink that it does not have to re-sort the data again?

Best,

Helmut

Reply | Threaded
Open this post in threaded view
|

Re: Processing Sorted Input Datasets

Fabian Hueske-2
Hi Helmut,

In fact this is possible with the DataSet API. However, AFAIK it is an undocumented feature and probably not widely used.
You can do this by specifying so-called SplitDataProperties on a DataSource as follows:

DataSource<T> src = env.createInput(...);
SplitDataProperties<T> splitProps = src.getSplitDataProperties();

These SplitDataProperties can be used to declare how data is organized within InputSplits (Flink's unit of distributing input data across source operators).
For example you can declare that the data in all split is sorted ascendingly on the second attribute (index 1) as follows:

splitProps.splitsOrderedBy(new int[]{1}, new Orders[]{Order.ASCENDING});

Other properties are grouping properties (a similar but weaker property then order) and partitioning across splits.

The optimizer will take this information into consideration when chosing the execution strategies for the DataSet operators.
However, you need to make sure that all provided information is 100% correct. Otherwise, you might end up with an execution plan that does not compute the correct result.
It might also make sense to tweak the input split generation (generate only one split per file) in order to be able to provide more split properties.

Hope this helps,
Fabian



2018-05-08 17:37 GMT+02:00 Helmut Zechmann <[hidden email]>:
Hi all,
Helmut Zechmann
0151 27527950




we want to use flink batch to merge records from two or more datasets using groupBy.
The input datasets are already sorted since they have been written out sorted by some other job.

Is it possible to tell flink that it does not have to re-sort the data again?

Best,

Helmut