Incremental aggregation using Fold and failure recovery

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Incremental aggregation using Fold and failure recovery

hassahma
Hi All,

I am collecting millions of events per hour for 'N' number of products where 'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>>. So for 50k products I will have 50k entries in the map within WindowStats class.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for each existing window will automatically be checkpointed and restored on recovery? If not then how can I better a implement above usecase to store product metric using fold operation please?

Thanks for all the help.

Best Regards,
Reply | Threaded
Open this post in threaded view
|

Fwd: Incremental aggregation using Fold and failure recovery

hassahma
Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>> which keeps products event count and other various metrics. So for 50k products I will have 50k entries in the map within WindowStats instance instead of millions of Events as fold function will process them as the event arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for each existing window will automatically be checkpointed and restored on recovery? If not then how can I better a implement above usecase to store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,

Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental aggregation using Fold and failure recovery

Tzu-Li (Gordon) Tai
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats instance) will be checkpointed by Flink as managed state, and restored from the last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the fold is just collecting the elements of the windows per key and performing the actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon



On 29 June 2017 at 5:11:58 PM, Ahmad Hassan ([hidden email]) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>> which keeps products event count and other various metrics. So for 50k products I will have 50k entries in the map within WindowStats instance instead of millions of Events as fold function will process them as the event arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for each existing window will automatically be checkpointed and restored on recovery? If not then how can I better a implement above usecase to store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,

Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental aggregation using Fold and failure recovery

hassahma
Hi Gordon,

Thanks for the details. I am using fold to process events and maintain statistics per each product ID within WindowStats instance. So fold is much efficient because events can be in millions but unique products will be less than 50k. However, if i use generic window function, It will be less efficient because window function will receive a collection of millions of events and they will be replicated for each sliding window as Flink replicate events for each sliding window.

However, can you give an idea on how to use aggregateFunction in latest flink to replace the following fold function?

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Thanks!

On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats instance) will be checkpointed by Flink as managed state, and restored from the last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the fold is just collecting the elements of the windows per key and performing the actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon



On 29 June 2017 at 5:11:58 PM, Ahmad Hassan ([hidden email]) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>> which keeps products event count and other various metrics. So for 50k products I will have 50k entries in the map within WindowStats instance instead of millions of Events as fold function will process them as the event arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for each existing window will automatically be checkpointed and restored on recovery? If not then how can I better a implement above usecase to store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,


Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental aggregation using Fold and failure recovery

Tzu-Li (Gordon) Tai
I see. Then yes, a fold operation would be more efficient here.


However, can you give an idea on how to use aggregateFunction in latest flink to replace the following fold function?

Sure! The documentation for 1.3 is still a bit lagging behind for some of the new features, but the Javadoc for `AggregateFunction` should be rather self-explaining.

As a quick sketch, here’s what you would do to achieve the same thing:

public class WindowStatsAggregator implements AggregateFunction<IN, WindowStats, OUT> {

public WindowStats createAccumulator() {
return new WindowStats();
}

public AverageAccumulator merge(WindowStats a, WindowStats b) {
// merge the two unique products map in your WindowStats
}

public void add(IN value, WindowStats acc) {
// update your unique products map
}

public OUT getResult(WindowStats acc) {
return acc.getMap();
}
}

As you can see, the `AggregateFunction` is more generic, and should subsume whatever you were previously doing with fold.
Your previous `WindowStats` class is basically the state accumulator, and you need to implement how to update it, merge two accumulators, and retrieve the final accumulated result.

For more info, I would point to the class Javadocs of `AggregateFunction`.

Best,
Gordon

On 29 June 2017 at 8:06:25 PM, Ahmad Hassan ([hidden email]) wrote:

Hi Gordon,

Thanks for the details. I am using fold to process events and maintain statistics per each product ID within WindowStats instance. So fold is much efficient because events can be in millions but unique products will be less than 50k. However, if i use generic window function, It will be less efficient because window function will receive a collection of millions of events and they will be replicated for each sliding window as Flink replicate events for each sliding window.

However, can you give an idea on how to use aggregateFunction in latest flink to replace the following fold function?

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Thanks!

On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats instance) will be checkpointed by Flink as managed state, and restored from the last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the fold is just collecting the elements of the windows per key and performing the actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon



On 29 June 2017 at 5:11:58 PM, Ahmad Hassan ([hidden email]) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>> which keeps products event count and other various metrics. So for 50k products I will have 50k entries in the map within WindowStats instance instead of millions of Events as fold function will process them as the event arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for each existing window will automatically be checkpointed and restored on recovery? If not then how can I better a implement above usecase to store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,


Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental aggregation using Fold and failure recovery

Tzu-Li (Gordon) Tai
Sorry, one typo.
public AverageAccumulator merge(WindowStats a, WindowStats b) {
should be:
public WindowStats merge(WindowStats a, WindowStats b) {

On 29 June 2017 at 8:22:34 PM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:

I see. Then yes, a fold operation would be more efficient here.


However, can you give an idea on how to use aggregateFunction in latest flink to replace the following fold function?

Sure! The documentation for 1.3 is still a bit lagging behind for some of the new features, but the Javadoc for `AggregateFunction` should be rather self-explaining.

As a quick sketch, here’s what you would do to achieve the same thing:

public class WindowStatsAggregator implements AggregateFunction<IN, WindowStats, OUT> {

public WindowStats createAccumulator() {
return new WindowStats();
}

public AverageAccumulator merge(WindowStats a, WindowStats b) {
// merge the two unique products map in your WindowStats
}

public void add(IN value, WindowStats acc) {
// update your unique products map
}

public OUT getResult(WindowStats acc) {
return acc.getMap();
}
}

As you can see, the `AggregateFunction` is more generic, and should subsume whatever you were previously doing with fold.
Your previous `WindowStats` class is basically the state accumulator, and you need to implement how to update it, merge two accumulators, and retrieve the final accumulated result.

For more info, I would point to the class Javadocs of `AggregateFunction`.

Best,
Gordon

On 29 June 2017 at 8:06:25 PM, Ahmad Hassan ([hidden email]) wrote:

Hi Gordon,

Thanks for the details. I am using fold to process events and maintain statistics per each product ID within WindowStats instance. So fold is much efficient because events can be in millions but unique products will be less than 50k. However, if i use generic window function, It will be less efficient because window function will receive a collection of millions of events and they will be replicated for each sliding window as Flink replicate events for each sliding window.

However, can you give an idea on how to use aggregateFunction in latest flink to replace the following fold function?

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Thanks!

On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats instance) will be checkpointed by Flink as managed state, and restored from the last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the fold is just collecting the elements of the windows per key and performing the actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon



On 29 June 2017 at 5:11:58 PM, Ahmad Hassan ([hidden email]) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>> which keeps products event count and other various metrics. So for 50k products I will have 50k entries in the map within WindowStats instance instead of millions of Events as fold function will process them as the event arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for each existing window will automatically be checkpointed and restored on recovery? If not then how can I better a implement above usecase to store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,


Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental aggregation using Fold and failure recovery

hassahma
Thanks a lot Gordon!

On 29 June 2017 at 13:39, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Sorry, one typo.
public AverageAccumulator merge(WindowStats a, WindowStats b) {
should be:
public WindowStats merge(WindowStats a, WindowStats b) {

On 29 June 2017 at 8:22:34 PM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:

I see. Then yes, a fold operation would be more efficient here.


However, can you give an idea on how to use aggregateFunction in latest flink to replace the following fold function?

Sure! The documentation for 1.3 is still a bit lagging behind for some of the new features, but the Javadoc for `AggregateFunction` should be rather self-explaining.

As a quick sketch, here’s what you would do to achieve the same thing:

public class WindowStatsAggregator implements AggregateFunction<IN, WindowStats, OUT> {

public WindowStats createAccumulator() {
return new WindowStats();
}

public AverageAccumulator merge(WindowStats a, WindowStats b) {
// merge the two unique products map in your WindowStats
}

public void add(IN value, WindowStats acc) {
// update your unique products map
}

public OUT getResult(WindowStats acc) {
return acc.getMap();
}
}

As you can see, the `AggregateFunction` is more generic, and should subsume whatever you were previously doing with fold.
Your previous `WindowStats` class is basically the state accumulator, and you need to implement how to update it, merge two accumulators, and retrieve the final accumulated result.

For more info, I would point to the class Javadocs of `AggregateFunction`.

Best,
Gordon

On 29 June 2017 at 8:06:25 PM, Ahmad Hassan ([hidden email]) wrote:

Hi Gordon,

Thanks for the details. I am using fold to process events and maintain statistics per each product ID within WindowStats instance. So fold is much efficient because events can be in millions but unique products will be less than 50k. However, if i use generic window function, It will be less efficient because window function will receive a collection of millions of events and they will be replicated for each sliding window as Flink replicate events for each sliding window.

However, can you give an idea on how to use aggregateFunction in latest flink to replace the following fold function?

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Thanks!

On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats instance) will be checkpointed by Flink as managed state, and restored from the last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the fold is just collecting the elements of the windows per key and performing the actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon



On 29 June 2017 at 5:11:58 PM, Ahmad Hassan ([hidden email]) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>> which keeps products event count and other various metrics. So for 50k products I will have 50k entries in the map within WindowStats instance instead of millions of Events as fold function will process them as the event arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for each existing window will automatically be checkpointed and restored on recovery? If not then how can I better a implement above usecase to store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,