http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/does-reduce-function-has-a-bug-tp5747.html
I have keyed input stream on DateStream(String,Int) and wrote a reduce on the keyedStream. The reduce is simple one summing up the integer values of the same key.
val stream = DataStream(String,Int)
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
keyedStream.print()
class MyReduceFunction extends ReduceFunction(String,Int) {
override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) = {
(in._1, in._2+in1._2)
}
}
Here is my sample input stream.
( "k1",1)
("k1",1)
("k2",1)
I was expecting the output of the above program to return
("k1",2)
("k2",1)
where as I got this,
("k1",1)
("k1",2)
("k2",1)
Isn't this a incorrect output.
Balaji