Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

Joe Hansen
Happy Holidays everyone!

tl;dr: I need to aggregate movie rental information that is being
stored in one DynamoDB table and store running total of the
aggregation in another table. How do I ensure exactly-once
aggregation.

I currently store movie rental information in a DynamoDB table named
MovieRentals: {movie_title, rental_period_in_days, order_date,
rent_amount}

We have millions of movie rentals happening on any given day.  Our web
application needs to display the aggregated rental amount for any
given movie title.

I am planning to use Flink to aggregate rental amounts by movie_title
on the MovieRental DynamoDB stream and store the aggregated rental
amounts in another DynamoDB table named RentalAmountsByMovie:
{movie_title, total_rental_amount}

How do I ensure that RentalAmountsByMovie amounts are accurate. i.e.
How do I prevent results from any checkpoint from not updating the
RentalAmountsByMovie table records more than once?

1) Do I need to store checkpoint ids in the RentalAmountsByMovie table
and do conditional updates to handle the scenario described above?
2) I can possibly implement TwoPhaseCommitSinkFunction that talks to
DynamoDB. However, according to Flink documentation the commit
function can be called more than once and hence needs to be
idempotent. So even this solution requires checkpoint-ids to be stored
on the target store.
3) Another pattern seems to be storing the time-window aggregation
results in the RentalAmountsByMovie table. And the webapp will have to
compute the running total on the fly. I don't like this solution for
its latency implications to the webapp.
4) May be I can use Flink's Queryable state feature. However, that
feature seems to be in Beta:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

I imagine this is a very common aggregation use case. How do folks
usually handle **updating aggregated results in Flink external
sinks**?

I appreciate any pointers. Happy to provide more details if needed.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

Zhijiang(wangzhijiang999)
Hi Joe,

Your requirement is the effective exactly-once for external sink. I think your option 4 with TwoPhaseCommitSinkFunction is the right way to go.
Unfortunately I am not quite familiar with this part, so can not give you specific suggestions for using it, especially for your concern of storing checkpoint id.
After the holiday some guys with rich experienced with it can provide you more professional ideas I guess. :)

ATM you can refer to the simple implementation TwoPhaseCommitSinkFunctionTest#ContentDumpSinkFunction and complex one FlinkKafkaProducer for more insights.
In addition, the StreamingFileSink also implements the exactly-once for sink. You might also refer to it to get some insights if possible.

Best,
Zhijiang

------------------------------------------------------------------
From:Joe Hansen <[hidden email]>
Send Time:2019 Dec. 26 (Thu.) 01:42
To:user <[hidden email]>
Subject:Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

Happy Holidays everyone!

tl;dr: I need to aggregate movie rental information that is being
stored in one DynamoDB table and store running total of the
aggregation in another table. How do I ensure exactly-once
aggregation.

I currently store movie rental information in a DynamoDB table named
MovieRentals: {movie_title, rental_period_in_days, order_date,
rent_amount}

We have millions of movie rentals happening on any given day.  Our web
application needs to display the aggregated rental amount for any
given movie title.

I am planning to use Flink to aggregate rental amounts by movie_title
on the MovieRental DynamoDB stream and store the aggregated rental
amounts in another DynamoDB table named RentalAmountsByMovie:
{movie_title, total_rental_amount}

How do I ensure that RentalAmountsByMovie amounts are accurate. i.e.
How do I prevent results from any checkpoint from not updating the
RentalAmountsByMovie table records more than once?

1) Do I need to store checkpoint ids in the RentalAmountsByMovie table
and do conditional updates to handle the scenario described above?
2) I can possibly implement TwoPhaseCommitSinkFunction that talks to
DynamoDB. However, according to Flink documentation the commit
function can be called more than once and hence needs to be
idempotent. So even this solution requires checkpoint-ids to be stored
on the target store.
3) Another pattern seems to be storing the time-window aggregation
results in the RentalAmountsByMovie table. And the webapp will have to
compute the running total on the fly. I don't like this solution for
its latency implications to the webapp.
4) May be I can use Flink's Queryable state feature. However, that
feature seems to be in Beta:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

I imagine this is a very common aggregation use case. How do folks
usually handle **updating aggregated results in Flink external
sinks**?

I appreciate any pointers. Happy to provide more details if needed.

Thanks!