TimeWindow doesn't trigger reduce function

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

TimeWindow doesn't trigger reduce function

Soheil Pourbafrani
Hi, My stream data is in a type of Tuple2<Long, String> that contains the timestamp (in second) and data, respectively. The source will generate 120 sample every second. Using the following code I want to get data in every second and then apply the reduce function on them.

temp.keyBy( 0).timeWindow(Time.seconds(1))
.reduce(new ReduceFunction<Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> reduce(Tuple2<Long, String> longStringTuple2, Tuple2<Long, String> t1) throws Exception {
System.out.println("reduced");
return new Tuple2<>(longStringTuple2.f0, longStringTuple2.f1 + "," + t1.f1) ;
}
}).print() ;
I expected it print reduced data for every second, according to the reduce function, but it just print the test line
System.out.println("reduced");
that I put in reduce function to see if it enter the reduce function or not. I can confirm the data are entering in temp variable.

What is the problem ? Should I implement a trigger function
Reply | Threaded
Open this post in threaded view
|

Re: TimeWindow doesn't trigger reduce function

Hequn Cheng
Hi Soheil,

It seems you job stops within 1 second? 
The processing time window doesn't output data if time hasn't reach the window end. While event time window will output a final watermark during close() to avoid this problem.
You can try to increase the running time of your job to get the output data.
 
Best, Hequn

On Fri, Jul 13, 2018 at 6:37 PM, Soheil Pourbafrani <[hidden email]> wrote:
Hi, My stream data is in a type of Tuple2<Long, String> that contains the timestamp (in second) and data, respectively. The source will generate 120 sample every second. Using the following code I want to get data in every second and then apply the reduce function on them.

temp.keyBy( 0).timeWindow(Time.seconds(1))
.reduce(new ReduceFunction<Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> reduce(Tuple2<Long, String> longStringTuple2, Tuple2<Long, String> t1) throws Exception {
System.out.println("reduced");
return new Tuple2<>(longStringTuple2.f0, longStringTuple2.f1 + "," + t1.f1) ;
}
}).print() ;
I expected it print reduced data for every second, according to the reduce function, but it just print the test line
System.out.println("reduced");
that I put in reduce function to see if it enter the reduce function or not. I can confirm the data are entering in temp variable.

What is the problem ? Should I implement a trigger function