Sorting in datastream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Sorting in datastream

subashbasnet
Hello all, 

I found the sortPartition() function in dataset for ordering the dataset elements as below: 
DataSet<Tuple2<Integer, String>> data;
DataSet<Tuple2<Integer, String>> partitionedData = data.sortPartition(0, Order.DESCENDING);

But I couldn't find any methods to sort the elements in datastream. 
DataStream<Tuple2<Integer, String>> data;
DataStream<Tuple2<Integer, String>> partitionedData =data.??

What could be the way to achieve sorting in datastream elements. 


Best Regards,
Subash Basnet
Reply | Threaded
Open this post in threaded view
|

Re: Sorting in datastream

Stephan Ewen
Hi!

Data streams are inifnite. It's quite hard to sort something infinite ;-) That's why the operation does not exist on DataStream.

Stephan


On Wed, Aug 17, 2016 at 6:22 PM, subash basnet <[hidden email]> wrote:
Hello all, 

I found the sortPartition() function in dataset for ordering the dataset elements as below: 
DataSet<Tuple2<Integer, String>> data;
DataSet<Tuple2<Integer, String>> partitionedData = data.sortPartition(0, Order.DESCENDING);

But I couldn't find any methods to sort the elements in datastream. 
DataStream<Tuple2<Integer, String>> data;
DataStream<Tuple2<Integer, String>> partitionedData =data.??

What could be the way to achieve sorting in datastream elements. 


Best Regards,
Subash Basnet

Reply | Threaded
Open this post in threaded view
|

Re: Sorting in datastream

subashbasnet
Hello Stephan, 

Okey, then it's the same reason why there is no count() function in Data streams as well I suppose. 


Regards,
Subash

On Wed, Aug 17, 2016 at 6:26 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Data streams are inifnite. It's quite hard to sort something infinite ;-) That's why the operation does not exist on DataStream.

Stephan


On Wed, Aug 17, 2016 at 6:22 PM, subash basnet <[hidden email]> wrote:
Hello all, 

I found the sortPartition() function in dataset for ordering the dataset elements as below: 
DataSet<Tuple2<Integer, String>> data;
DataSet<Tuple2<Integer, String>> partitionedData = data.sortPartition(0, Order.DESCENDING);

But I couldn't find any methods to sort the elements in datastream. 
DataStream<Tuple2<Integer, String>> data;
DataStream<Tuple2<Integer, String>> partitionedData =data.??

What could be the way to achieve sorting in datastream elements. 


Best Regards,
Subash Basnet


Reply | Threaded
Open this post in threaded view
|

Re: Sorting in datastream

LiZhe
Hi subashbasnet!

I think you can using a WindowFunction for a DataStream, and sort the dataset in the windowFunction. Such as transfer a input(Iterable<T>) to a List<T>, and then use List<T>.sortBy(*) method to sort the input.

For example:

val ds = DataStream<T>
ds.keyBy(key).window(...).apply(new WindowFunction)...
..
class WindowFunction extends WindowFunction..{
 override def apply(key, window, input: Iterable[StockTransaction], out: Collector){
     val listInput = input.toList.sortBy(_.col1)
....
   }
}

Hope it's helpful for you.