Use flink to calculate sum of the inventory under certain conditions

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Use flink to calculate sum of the inventory under certain conditions

Jiawei Wu
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.
Reply | Threaded
Open this post in threaded view
|

Re: Use flink to calculate sum of the inventory under certain conditions

rmetzger0
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC stream), and you would like to maintain the inventory of the last 15 days for each vendor.
Whenever there's an update in the inventory data (a new event arrives in the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's state to have an accurate count and to drop old records when they are expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as long as your state fits on all your machines hard disks (we'll probably not have an issue there :) )
If you run into performance issues, you can consider switching to a memory based backend (by then, you should have some knowledge about your state size)

I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it in Flink. This is an example for reading from a DynamoDB stream: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java

For writing to DynamoDB there is currently no official sink in Flink. It should be fairly straightforward to implement a Sink using the SinkFunction interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <[hidden email]> wrote:
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.
Reply | Threaded
Open this post in threaded view
|

Re: Use flink to calculate sum of the inventory under certain conditions

Jiawei Wu
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

Thanks,


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <[hidden email]> wrote:
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC stream), and you would like to maintain the inventory of the last 15 days for each vendor.
Whenever there's an update in the inventory data (a new event arrives in the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's state to have an accurate count and to drop old records when they are expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as long as your state fits on all your machines hard disks (we'll probably not have an issue there :) )
If you run into performance issues, you can consider switching to a memory based backend (by then, you should have some knowledge about your state size)

I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it in Flink. This is an example for reading from a DynamoDB stream: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java

For writing to DynamoDB there is currently no official sink in Flink. It should be fairly straightforward to implement a Sink using the SinkFunction interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <[hidden email]> wrote:
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.
Reply | Threaded
Open this post in threaded view
|

Re: Use flink to calculate sum of the inventory under certain conditions

Kurt Young
Hi Jiawai, 

Sorry I still didn't fully get your question. What's wrong with your proposed SQL? 

> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId

My guess is that such query would only trigger calculations by new event. So if a very old
inventory like inbounded 17 days ago, and there is no new events coming about that inventory,
then the calculation would not be triggered and you can't sum it, right? 

Best,
Kurt


On Wed, Mar 11, 2020 at 10:06 AM Jiawei Wu <[hidden email]> wrote:
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

Thanks,


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <[hidden email]> wrote:
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC stream), and you would like to maintain the inventory of the last 15 days for each vendor.
Whenever there's an update in the inventory data (a new event arrives in the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's state to have an accurate count and to drop old records when they are expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as long as your state fits on all your machines hard disks (we'll probably not have an issue there :) )
If you run into performance issues, you can consider switching to a memory based backend (by then, you should have some knowledge about your state size)

I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it in Flink. This is an example for reading from a DynamoDB stream: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java

For writing to DynamoDB there is currently no official sink in Flink. It should be fairly straightforward to implement a Sink using the SinkFunction interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <[hidden email]> wrote:
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.
Reply | Threaded
Open this post in threaded view
|

Re: Use flink to calculate sum of the inventory under certain conditions

Jiawei Wu
Hi Kurt,

What you said is the 1st reason.
The second reason is this query need to scan the whole table. I think we can do better :-)

Best,
Jiawei

On Wed, Mar 11, 2020 at 10:52 AM Kurt Young <[hidden email]> wrote:
Hi Jiawai, 

Sorry I still didn't fully get your question. What's wrong with your proposed SQL? 

> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId

My guess is that such query would only trigger calculations by new event. So if a very old
inventory like inbounded 17 days ago, and there is no new events coming about that inventory,
then the calculation would not be triggered and you can't sum it, right? 

Best,
Kurt


On Wed, Mar 11, 2020 at 10:06 AM Jiawei Wu <[hidden email]> wrote:
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

Thanks,


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <[hidden email]> wrote:
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC stream), and you would like to maintain the inventory of the last 15 days for each vendor.
Whenever there's an update in the inventory data (a new event arrives in the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's state to have an accurate count and to drop old records when they are expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as long as your state fits on all your machines hard disks (we'll probably not have an issue there :) )
If you run into performance issues, you can consider switching to a memory based backend (by then, you should have some knowledge about your state size)

I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it in Flink. This is an example for reading from a DynamoDB stream: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java

For writing to DynamoDB there is currently no official sink in Flink. It should be fairly straightforward to implement a Sink using the SinkFunction interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <[hidden email]> wrote:
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.
Reply | Threaded
Open this post in threaded view
|

Re: Use flink to calculate sum of the inventory under certain conditions

Arvid Heise-3
In reply to this post by Jiawei Wu
About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.
 
Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

You pretty much described the biggest difference already. Doing any more complex operation with Lambda will turn into a mess quickly.

Lambdas currently shine for two use cases because of the ease of operation and unlimited scalability:
- Simple transformations: input -> transform -> output
- Simple database updates (together with Dynamo): input -> lookup by key (db), update by key (db) -> output

As soon as you exceed point queries (time windows, joins) or have state, Lambdas actually get harder to manage imho. You need a zoo of supporting technologies or sacrifice lots of performance.

In Flink, you have a higher barrier to entry, but as soon as your streaming application grows, it pays off quickly. Data is relocated with processing, such that you don't need to program access patterns yourself.

So I'd decide it on a case by case basis for each application. If it's one of the two above mentioned use cases, just go lambda. You will not gain much with Flink, especially if you already have the experience.
If you know your application will grow out of these use cases or is more complex to begin with, consider Flink.

There is also one relatively new technology based on Flink called stateful functions [1]. It tries to combine the advanced state processing of Flink with the benefits of Lambdas (albeit scalability is not unlimited). You might want to check that out, as it may solve your use cases.


On Wed, Mar 11, 2020 at 3:06 AM Jiawei Wu <[hidden email]> wrote:
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

Thanks,


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <[hidden email]> wrote:
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC stream), and you would like to maintain the inventory of the last 15 days for each vendor.
Whenever there's an update in the inventory data (a new event arrives in the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's state to have an accurate count and to drop old records when they are expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as long as your state fits on all your machines hard disks (we'll probably not have an issue there :) )
If you run into performance issues, you can consider switching to a memory based backend (by then, you should have some knowledge about your state size)

I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it in Flink. This is an example for reading from a DynamoDB stream: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java

For writing to DynamoDB there is currently no official sink in Flink. It should be fairly straightforward to implement a Sink using the SinkFunction interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <[hidden email]> wrote:
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.
Reply | Threaded
Open this post in threaded view
|

Re: Use flink to calculate sum of the inventory under certain conditions

Kurt Young
> The second reason is this query need to scan the whole table. I think we can do better :-)

Not necessarily, you said all the changes will trigger a DDB stream, you can use Flink to consume such
stream incrementally. 

For the 1st problem, I think you can use DataStream API and register a timer on every inventory which
got inbound. If the inventory got updated before timeout, you can delete the timer, otherwise the timer
will trigger the calculation after timeout and you can get the total count and emit that whenever an inventory 
times out.
 
Best,
Kurt


On Wed, Mar 11, 2020 at 4:53 PM Arvid Heise <[hidden email]> wrote:
About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.
 
Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

You pretty much described the biggest difference already. Doing any more complex operation with Lambda will turn into a mess quickly.

Lambdas currently shine for two use cases because of the ease of operation and unlimited scalability:
- Simple transformations: input -> transform -> output
- Simple database updates (together with Dynamo): input -> lookup by key (db), update by key (db) -> output

As soon as you exceed point queries (time windows, joins) or have state, Lambdas actually get harder to manage imho. You need a zoo of supporting technologies or sacrifice lots of performance.

In Flink, you have a higher barrier to entry, but as soon as your streaming application grows, it pays off quickly. Data is relocated with processing, such that you don't need to program access patterns yourself.

So I'd decide it on a case by case basis for each application. If it's one of the two above mentioned use cases, just go lambda. You will not gain much with Flink, especially if you already have the experience.
If you know your application will grow out of these use cases or is more complex to begin with, consider Flink.

There is also one relatively new technology based on Flink called stateful functions [1]. It tries to combine the advanced state processing of Flink with the benefits of Lambdas (albeit scalability is not unlimited). You might want to check that out, as it may solve your use cases.


On Wed, Mar 11, 2020 at 3:06 AM Jiawei Wu <[hidden email]> wrote:
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

Thanks,


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <[hidden email]> wrote:
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC stream), and you would like to maintain the inventory of the last 15 days for each vendor.
Whenever there's an update in the inventory data (a new event arrives in the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's state to have an accurate count and to drop old records when they are expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as long as your state fits on all your machines hard disks (we'll probably not have an issue there :) )
If you run into performance issues, you can consider switching to a memory based backend (by then, you should have some knowledge about your state size)

I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it in Flink. This is an example for reading from a DynamoDB stream: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java

For writing to DynamoDB there is currently no official sink in Flink. It should be fairly straightforward to implement a Sink using the SinkFunction interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <[hidden email]> wrote:
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.
Reply | Threaded
Open this post in threaded view
|

Re: Use flink to calculate sum of the inventory under certain conditions

Jiawei Wu
Thanks Arvid and Kurt. That's very helpful discussion.
Currently we will continue this with Lambda, but I'll definitely do a A-A test between Lambda and Flink for this case.

Regards,
Jiawei

On Wed, Mar 11, 2020 at 5:40 PM Kurt Young <[hidden email]> wrote:
> The second reason is this query need to scan the whole table. I think we can do better :-)

Not necessarily, you said all the changes will trigger a DDB stream, you can use Flink to consume such
stream incrementally. 

For the 1st problem, I think you can use DataStream API and register a timer on every inventory which
got inbound. If the inventory got updated before timeout, you can delete the timer, otherwise the timer
will trigger the calculation after timeout and you can get the total count and emit that whenever an inventory 
times out.
 
Best,
Kurt


On Wed, Mar 11, 2020 at 4:53 PM Arvid Heise <[hidden email]> wrote:
About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.
 
Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

You pretty much described the biggest difference already. Doing any more complex operation with Lambda will turn into a mess quickly.

Lambdas currently shine for two use cases because of the ease of operation and unlimited scalability:
- Simple transformations: input -> transform -> output
- Simple database updates (together with Dynamo): input -> lookup by key (db), update by key (db) -> output

As soon as you exceed point queries (time windows, joins) or have state, Lambdas actually get harder to manage imho. You need a zoo of supporting technologies or sacrifice lots of performance.

In Flink, you have a higher barrier to entry, but as soon as your streaming application grows, it pays off quickly. Data is relocated with processing, such that you don't need to program access patterns yourself.

So I'd decide it on a case by case basis for each application. If it's one of the two above mentioned use cases, just go lambda. You will not gain much with Flink, especially if you already have the experience.
If you know your application will grow out of these use cases or is more complex to begin with, consider Flink.

There is also one relatively new technology based on Flink called stateful functions [1]. It tries to combine the advanced state processing of Flink with the benefits of Lambdas (albeit scalability is not unlimited). You might want to check that out, as it may solve your use cases.


On Wed, Mar 11, 2020 at 3:06 AM Jiawei Wu <[hidden email]> wrote:
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it, and the most important, it lead to nearly no operational burden. But we are actively looking for the comparison between Lambda and Flink and want to know in which situation we prefer Flink over Lambda. Several teams in our company are already in a hot debate about the comparison, and the biggest concern is the non-function requirements about Flink, such as fault tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons between Lambda and Flink except for their market share :-( I'm wondering what do you think of this? Or any comments from flink community is appreciated.

Thanks,


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger <[hidden email]> wrote:
Hey Jiawei,

I'm sorry that you haven't received an answer yet.

So you basically have a stream of dynamodb table updates (let's call id CDC stream), and you would like to maintain the inventory of the last 15 days for each vendor.
Whenever there's an update in the inventory data (a new event arrives in the CDC stream), you want to produce a new event with the inventory count.

If I'm not mistaken, you will need to keep all the inventory in Flink's state to have an accurate count and to drop old records when they are expired.
There are two options for maintaining the state:
- in memory (using the FsStateBackend)
- on disk (using the embedded RocksDBStatebackend)

I would recommend starting with the RocksDBStateBackend. It will work as long as your state fits on all your machines hard disks (we'll probably not have an issue there :) )
If you run into performance issues, you can consider switching to a memory based backend (by then, you should have some knowledge about your state size)

I personally would give it a try with ProcessFunction first.

For reading the data from DynamoDB, there's an undocumented feature for it in Flink. This is an example for reading from a DynamoDB stream: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java

For writing to DynamoDB there is currently no official sink in Flink. It should be fairly straightforward to implement a Sink using the SinkFunction interface of Flink.

I hope this answers your question.

Best,
Robert




On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu <[hidden email]> wrote:
Hi flink users,

We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :)

Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like:

* vendorId (primary key)
* inventory name
* inventory units
* inbound time
...

This DDB table keeps changing, since we have inventory coming and removal. Every change will trigger a DynamoDB stream. 
We need to calculate all the inventory units that > 15 days for a specific vendor like this:
> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId
We don't want to schedule a daily batch job, so we are trying to work on a micro-batch solution in Flink, and publish this data to another DynamoDB table.

A draft idea is to use the total units minus <15 days units, since both of then have event trigger. But no detailed solutions yet.

Could anyone help provide some insights here?

Thanks,
J.