Thanks for your reply, may be I should have given more details. val stream = DataStream(String,Int) is already windowed. Ideally I have all the data that I need in my data stream, all my trying to do is like HashMap[String,Int] from tuples(String,Int) , if reduce is not the best solution, can you please suggest another way to do the same.
val source: DataStream[String] = someSource
val stream = source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply { x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=> mywindowfunc(x,y,z,w)}
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
keyedStream.print()
Balaji