flink datastream reduce

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink datastream reduce

rimin515
Hi,
      in flink,the datastream have reduce Transformations,but the result do not satisfy for me,
    for example,
    val pairs2 = env.fromCollection((Array(("a", Map(3->"rt")),("a", Map(4->"yt")),("b", Map(5->"dfs")))))
    val re= pairs2.keyBy(0).reduce((x1,x2)=>(x1._1,x2._2++x1._2))
    re.map{x=>
         println(x)
          x
    }

   the result are,
     (a,Map(3 -> rt))
     (a,Map(3 -> rt, 4 -> yt))
     (b,Map(5 -> dfs))

   but i want to get ,
     (a,Map(3 -> rt, 4 -> yt))
     (b,Map(5 -> dfs))

--------------------------------


Reply | Threaded
Open this post in threaded view
|

Re: flink datastream reduce

Gábor Gévay
Hello,

The result contains (a,Map(3 -> rt)) because reduce prints all
intermediate results (sometimes called a "rolling reduce"). It's
designed this way, because Flink streams are generally infinite, so
there is no last element where you could print the "final" results.
However, you can use windowing [1] to perform the reduce on elements
that arrived in certain time intervals called windows, in which case
only the final results for a window are emitted.

Best,
Gábor

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html



2016-08-29 11:08 GMT+02:00  <[hidden email]>:

> Hi,
>       in flink,the datastream have reduce Transformations,but the result do
> not satisfy for me,
>     for example,
>     val pairs2 = env.fromCollection((Array(("a", Map(3->"rt")),("a",
> Map(4->"yt")),("b", Map(5->"dfs")))))
>     val re= pairs2.keyBy(0).reduce((x1,x2)=>(x1._1,x2._2++x1._2))
>     re.map{x=>
>          println(x)
>           x
>     }
>
>    the result are,
>      (a,Map(3 -> rt))
>      (a,Map(3 -> rt, 4 -> yt))
>      (b,Map(5 -> dfs))
>
>    but i want to get ,
>      (a,Map(3 -> rt, 4 -> yt))
>      (b,Map(5 -> dfs))
>
> --------------------------------
>
>