Datastream reset variable in every-time window in map function

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Datastream reset variable in every-time window in map function

subashbasnet
Hello there,

I have a requirement to reset variable after every-time window in map function. Once I set a value to the variable it continues through the next time window also. I would want the variable to have the same original value on every next time window. 
Eg:
DataStream<Tuple1<X>> grid = Y.map(new AddCountAppender()).keyBy(2).timeWindow(Time.seconds(10)).reduce(new test()).map(new testMap());

public static final class FindOutlierGrid
extends RichMapFunction<Tuple1<X>, Tuple1<X>> {
int count = 0;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}

public FindOutlierGrid() {
this.count = 0;
}

@Override
public Tuple1<X> map(Tuple1<X> gd) throws Exception {
count++;
}
}

In above map function, I would want count to be zero on every next time window. But value of count is zero only at start and never get's initialized to zero again. What could be the way to achieve this on every next time window?

Best Regards,
Subash Basnet
Reply | Threaded
Open this post in threaded view
|

Re: Datastream reset variable in every-time window in map function

Till Rohrmann
Hi Subash,

if you do the outlier detection as part of your window function/reducer, then you don't have to store state in your operator or you could clear it every time the window function is called (indicating that the window has been finished).

Cheers,
Till

On Sun, Nov 6, 2016 at 10:15 PM, subash basnet <[hidden email]> wrote:
Hello there,

I have a requirement to reset variable after every-time window in map function. Once I set a value to the variable it continues through the next time window also. I would want the variable to have the same original value on every next time window. 
Eg:
DataStream<Tuple1<X>> grid = Y.map(new AddCountAppender()).keyBy(2).timeWindow(Time.seconds(10)).reduce(new test()).map(new testMap());

public static final class FindOutlierGrid
extends RichMapFunction<Tuple1<X>, Tuple1<X>> {
int count = 0;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}

public FindOutlierGrid() {
this.count = 0;
}

@Override
public Tuple1<X> map(Tuple1<X> gd) throws Exception {
count++;
}
}

In above map function, I would want count to be zero on every next time window. But value of count is zero only at start and never get's initialized to zero again. What could be the way to achieve this on every next time window?

Best Regards,
Subash Basnet