Till,Thanks for your reply, may be I should have given more details. val stream = DataStream(String,Int) is already windowed. Ideally I have all the data that I need in my data stream, all my trying to do is like HashMap[String,Int] from tuples(String,Int) , if reduce is not the best solution, can you please suggest another way to do the same.val source: DataStream[String] = someSourceval stream = source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply { x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=> mywindowfunc(x,y,z,w)}val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)keyedStream.print()BalajiOn Thu, Mar 24, 2016 at 11:21 PM, Till Rohrmann <[hidden email]> wrote:Hi Balaji,the output you see is the correct output since you're computing a continuous reduce of the incoming data. Since you haven't defined a time frame for your reduce computation you either would have to wait for all eternity to output the final result or you output every time you've generated a new reduce result this result (which is of course partial). Since the first option is not very practical, Flink emits the partial reduce results.Cheers,TillOn Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan <[hidden email]> wrote:I have keyed input stream on DateStream(String,Int) and wrote a reduce on the keyedStream. The reduce is simple one summing up the integer values of the same key.val stream = DataStream(String,Int)val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)keyedStream.print()class MyReduceFunction extends ReduceFunction(String,Int) {override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) = {(in._1, in._2+in1._2)}}Here is my sample input stream.( "k1",1)("k1",1)("k2",1)I was expecting the output of the above program to return("k1",2)("k2",1)where as I got this,("k1",1)("k1",2)("k2",1)Isn't this a incorrect output.Balaji
Free forum by Nabble | Edit this page |