Hi Helmut,
In fact this is possible with the DataSet API. However, AFAIK it is an undocumented feature and probably not widely used.
You can do this by specifying so-called SplitDataProperties on a DataSource as follows:
DataSource<T> src = env.createInput(...);
SplitDataProperties<T> splitProps = src.getSplitDataProperties();
These SplitDataProperties can be used to declare how data is organized within InputSplits (Flink's unit of distributing input data across source operators).
For example you can declare that the data in all split is sorted ascendingly on the second attribute (index 1) as follows:
splitProps.splitsOrderedBy(new int[]{1}, new Orders[]{Order.ASCENDING});
Other properties are grouping properties (a similar but weaker property then order) and partitioning across splits.
The optimizer will take this information into consideration when chosing the execution strategies for the DataSet operators.
However, you need to make sure that all provided information is 100% correct. Otherwise, you might end up with an execution plan that does not compute the correct result.
It might also make sense to tweak the input split generation (generate only one split per file) in order to be able to provide more split properties.
Hope this helps,
Fabian