Thanks for the replies. Here is the code snippet of what we want to achieve:
We have sliding windows of 24hrs with 5 minutes apart.
inStream
.filter(Objects::nonNull)
.keyBy("tenant")
.window(SlidingProcessingTimeWindows.of(Time.minutes(1440), Time.minutes(5)))
.fold(new DefaultVector(), new CalculationFold(), new MetricCalculationApply());
public class CalculationFold implements FoldFunction<Event, DefaultVector>
{
private final MapState<String, DefaultProductMetricVector> products;
private transient MapStateDescriptor<String, DefaultProductMetricVector> descr;
@Override
public DefaultVector fold(DefaultVector stats, Event event)
{
if (products.contains(event.getProductId))
{
DefaultProductMetricVector product = products.get(event.getProductId);
product.updatePrice(event.getPrice);
products.put(event.getProductId, product);
}
else
{
DefaultProductMetricVector product = new DefaultProductMetricVector();
product.updatePrice(event.getPrice);
products.put(event.getProductId, product);
}
return stats;
}
// Fold function do not allow the open method and this.getRuntimeContext
//public void open(Configuration parameters) throws Exception
//{
// descr = new MapStateDescriptor<>("product", String.class, DefaultProductMetricVector.class);
// products = this.getRuntimeContext().getMapState(descr);
//}
}
We expect millions of unique products in 24 hour window so that is the reason we want to store state on rocksdb of each product class DefaultProductMetricVector instance. Otherwise, my understanding is that is that if i instantiate a java hashmap of products within DefaultVector fold accumulator then for each incoming event the full set of products will be deserialised and stored on heap which will eventually cause heap overflow error.
Please can you tell us how to solve this problem.
Thanks.