does reduce function has a bug

Posted by Balaji Rajagopalan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/does-reduce-function-has-a-bug-tp5747.html

I have keyed input stream on DateStream(String,Int) and wrote a reduce on the keyedStream. The reduce is simple one summing up the integer values of the same key. 

val stream = DataStream(String,Int)
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction) 
keyedStream.print()

class MyReduceFunction extends ReduceFunction(String,Int) {
   override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) = {
       (in._1, in._2+in1._2) 
   }
}

Here is my sample input stream. 
( "k1",1) 
("k1",1)
("k2",1) 

I was expecting the output of the above program to return
("k1",2)
("k2",1) 

where as I got this, 
("k1",1)
("k1",2)
("k2",1) 

Isn't this a incorrect output. 

Balaji