Flink Event specific window

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Event specific window

s_penakalapati@yahoo.com
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Arvid Heise-4
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Swagat Mishra
 1. What if there are a very high number of users, like a million customers won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this approach differ from the one proposed below. 

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <[hidden email]> wrote:
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Arvid Heise-4
1. It always depends on the data volume per user. A million user is not much if you compare it to the biggest Flink installations (Netflix, Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd recommend to use rocksDB state backend. [1]

2. Are you referring to statefun? I'd say that for your use case, Flink is a better fit. Statefun is more suitable when each actor (=user in your case) acts differently depending on the data like in a state machine. In your case, your users should be processed in the same way: Even if the windows are independently opened and closed, every user has only at most one window open at a given event time. You probably also aggregate all user states more or less in the same way.

Or did you refer to processing functions with state? That's certainly possible to implement but it won't be much faster unless you can exploit some specific properties of your application. An example is written in [2]. I'd recommend to first use regular, built-in windows and only switch to custom code if the performance is insufficient. Custom implementations may be faster now, but come with a higher maintenance cost and the built-in windows may be better optimized in future.

Lastly if your query is of relational nature, I'd recommend to have a look at Table API/SQL [3]. Unless you really invest a lot of time, you won't be able to write more efficient code than what Table API is generating.


On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <[hidden email]> wrote:
 1. What if there are a very high number of users, like a million customers won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this approach differ from the one proposed below. 

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <[hidden email]> wrote:
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Swagat Mishra
Hi Arvid,

On 2 - I was referring to stateful functions as an alternative to windows, but in this particular use case, its not fitting in exactly I think, though a solution can be built around it.

On the overall approach here what's the right way to use Flink SQL:  
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction

Extending above..

For the session window, taking the above example , reiterated below:

Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

1 hour window:
9:30AM - 10:30 AM : Customer 2
10:30 AM - 11:30 AM : Customer 1, Customer 3
11:30 AM - 12:30 PM : Customer 3

Questions - how do we access the state?
  1. Will the process window function write to an in-memory SQL table that does not get flushed to a proper backing database, so all the data stays in-memory -  if yes can that be queried?
  2. If the process window function writes to a proper backing database, at what point should this happen? Because the API can query the state at any point of time, so the data that was flushed might be state and need recomputation.
  3. How do you recommend for rock db to be used as a state backend? Is that the embedded rocks db or do you recommend an external implementation. Embedded rocks db state is lost when the container is restarted i guess, so we will have to have an external mechanism for restart/ crash recovery?
Regards,
Swagat



On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <[hidden email]> wrote:
1. It always depends on the data volume per user. A million user is not much if you compare it to the biggest Flink installations (Netflix, Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd recommend to use rocksDB state backend. [1]

2. Are you referring to statefun? I'd say that for your use case, Flink is a better fit. Statefun is more suitable when each actor (=user in your case) acts differently depending on the data like in a state machine. In your case, your users should be processed in the same way: Even if the windows are independently opened and closed, every user has only at most one window open at a given event time. You probably also aggregate all user states more or less in the same way.

Or did you refer to processing functions with state? That's certainly possible to implement but it won't be much faster unless you can exploit some specific properties of your application. An example is written in [2]. I'd recommend to first use regular, built-in windows and only switch to custom code if the performance is insufficient. Custom implementations may be faster now, but come with a higher maintenance cost and the built-in windows may be better optimized in future.

Lastly if your query is of relational nature, I'd recommend to have a look at Table API/SQL [3]. Unless you really invest a lot of time, you won't be able to write more efficient code than what Table API is generating.


On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <[hidden email]> wrote:
 1. What if there are a very high number of users, like a million customers won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this approach differ from the one proposed below. 

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <[hidden email]> wrote:
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Arvid Heise-4
Hi Swagat,

1. Where the data primarily resides depends on the chosen state backend [1]. In most cases, it's written to some file with a memory cache. It's possible to query the state [2] but not with SQL. In fact, it's so basic that we decided to drop the feature in the future to make room for a more sophisticated solution based around replicating the state to an external queryable form but there is nothing specific yet.
2. It would help if you (re)read the section about state persistence. [3] Basically, the state is updated on every write access of the process function. Flink creates a checkpoint of the state periodically and can recover from these checkpoint. It's also possible to look into these checkpoint with the state processor API [4].
3. It's embedded. See above to what happens on failure.


On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <[hidden email]> wrote:
Hi Arvid,

On 2 - I was referring to stateful functions as an alternative to windows, but in this particular use case, its not fitting in exactly I think, though a solution can be built around it.

On the overall approach here what's the right way to use Flink SQL:  
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction

Extending above..

For the session window, taking the above example , reiterated below:

Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

1 hour window:
9:30AM - 10:30 AM : Customer 2
10:30 AM - 11:30 AM : Customer 1, Customer 3
11:30 AM - 12:30 PM : Customer 3

Questions - how do we access the state?
  1. Will the process window function write to an in-memory SQL table that does not get flushed to a proper backing database, so all the data stays in-memory -  if yes can that be queried?
  2. If the process window function writes to a proper backing database, at what point should this happen? Because the API can query the state at any point of time, so the data that was flushed might be state and need recomputation.
  3. How do you recommend for rock db to be used as a state backend? Is that the embedded rocks db or do you recommend an external implementation. Embedded rocks db state is lost when the container is restarted i guess, so we will have to have an external mechanism for restart/ crash recovery?
Regards,
Swagat



On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <[hidden email]> wrote:
1. It always depends on the data volume per user. A million user is not much if you compare it to the biggest Flink installations (Netflix, Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd recommend to use rocksDB state backend. [1]

2. Are you referring to statefun? I'd say that for your use case, Flink is a better fit. Statefun is more suitable when each actor (=user in your case) acts differently depending on the data like in a state machine. In your case, your users should be processed in the same way: Even if the windows are independently opened and closed, every user has only at most one window open at a given event time. You probably also aggregate all user states more or less in the same way.

Or did you refer to processing functions with state? That's certainly possible to implement but it won't be much faster unless you can exploit some specific properties of your application. An example is written in [2]. I'd recommend to first use regular, built-in windows and only switch to custom code if the performance is insufficient. Custom implementations may be faster now, but come with a higher maintenance cost and the built-in windows may be better optimized in future.

Lastly if your query is of relational nature, I'd recommend to have a look at Table API/SQL [3]. Unless you really invest a lot of time, you won't be able to write more efficient code than what Table API is generating.


On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <[hidden email]> wrote:
 1. What if there are a very high number of users, like a million customers won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this approach differ from the one proposed below. 

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <[hidden email]> wrote:
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Swagat Mishra
Thanks Arvid.

very helpful.

On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <[hidden email]> wrote:
Hi Swagat,

1. Where the data primarily resides depends on the chosen state backend [1]. In most cases, it's written to some file with a memory cache. It's possible to query the state [2] but not with SQL. In fact, it's so basic that we decided to drop the feature in the future to make room for a more sophisticated solution based around replicating the state to an external queryable form but there is nothing specific yet.
2. It would help if you (re)read the section about state persistence. [3] Basically, the state is updated on every write access of the process function. Flink creates a checkpoint of the state periodically and can recover from these checkpoint. It's also possible to look into these checkpoint with the state processor API [4].
3. It's embedded. See above to what happens on failure.


On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <[hidden email]> wrote:
Hi Arvid,

On 2 - I was referring to stateful functions as an alternative to windows, but in this particular use case, its not fitting in exactly I think, though a solution can be built around it.

On the overall approach here what's the right way to use Flink SQL:  
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction

Extending above..

For the session window, taking the above example , reiterated below:

Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

1 hour window:
9:30AM - 10:30 AM : Customer 2
10:30 AM - 11:30 AM : Customer 1, Customer 3
11:30 AM - 12:30 PM : Customer 3

Questions - how do we access the state?
  1. Will the process window function write to an in-memory SQL table that does not get flushed to a proper backing database, so all the data stays in-memory -  if yes can that be queried?
  2. If the process window function writes to a proper backing database, at what point should this happen? Because the API can query the state at any point of time, so the data that was flushed might be state and need recomputation.
  3. How do you recommend for rock db to be used as a state backend? Is that the embedded rocks db or do you recommend an external implementation. Embedded rocks db state is lost when the container is restarted i guess, so we will have to have an external mechanism for restart/ crash recovery?
Regards,
Swagat



On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <[hidden email]> wrote:
1. It always depends on the data volume per user. A million user is not much if you compare it to the biggest Flink installations (Netflix, Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd recommend to use rocksDB state backend. [1]

2. Are you referring to statefun? I'd say that for your use case, Flink is a better fit. Statefun is more suitable when each actor (=user in your case) acts differently depending on the data like in a state machine. In your case, your users should be processed in the same way: Even if the windows are independently opened and closed, every user has only at most one window open at a given event time. You probably also aggregate all user states more or less in the same way.

Or did you refer to processing functions with state? That's certainly possible to implement but it won't be much faster unless you can exploit some specific properties of your application. An example is written in [2]. I'd recommend to first use regular, built-in windows and only switch to custom code if the performance is insufficient. Custom implementations may be faster now, but come with a higher maintenance cost and the built-in windows may be better optimized in future.

Lastly if your query is of relational nature, I'd recommend to have a look at Table API/SQL [3]. Unless you really invest a lot of time, you won't be able to write more efficient code than what Table API is generating.


On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <[hidden email]> wrote:
 1. What if there are a very high number of users, like a million customers won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this approach differ from the one proposed below. 

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <[hidden email]> wrote:
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Swagat Mishra
In reply to this post by Arvid Heise-4
Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected

closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat


On Wed, May 5, 2021 at 7:24 PM Swagat Mishra <[hidden email]> wrote:
Hi Arvid,

Tried a small POC to reproduce the behaviour, somehow dont see the process function getting called, am I doing something wrong?

customerStream
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(8)))
.process(new CustomAggregateFunction())
.print();
the process function looks like below
public class CustomAggregateFunction extends ProcessWindowFunction<Customer, CustomerAggregate, String, TimeWindow> {

@Override
public void process(String key, Context context, Iterable<Customer> iterable, Collector<CustomerAggregate> collector) throws Exception {
System.out.println("in aggregation");
}
}
the customer generator
public class CustomerGenerator implements SourceFunction<Customer> {

volatile boolean isRunning = true;

private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};

@Override
public void run(SourceContext<Customer> sourceContext) throws Exception {
int counter = 1;

while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000);
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
Thread.sleep(1000);
counter++;
}
}

@Override
public void cancel() {
isRunning = false;
}
}

Customer object
public class Customer {
private String identifier;
private LocalTime eventTime;
private double amount;

public Customer(String identifier, LocalTime eventTime, double amount) {
this.identifier = identifier;
this.amount = amount;
this.eventTime = eventTime;
}

public String getIdentifier() {
return identifier;
}

public LocalTime getEventTime() {
return eventTime;
}

public double getAmount() {
return amount;
}

@Override
public String toString() {
return "Customer{" +
"identifier='" + identifier + '\'' +
", eventTime=" + eventTime +
", amount=" + amount +
'}';
}
}


On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <[hidden email]> wrote:
Hi Swagat,

1. Where the data primarily resides depends on the chosen state backend [1]. In most cases, it's written to some file with a memory cache. It's possible to query the state [2] but not with SQL. In fact, it's so basic that we decided to drop the feature in the future to make room for a more sophisticated solution based around replicating the state to an external queryable form but there is nothing specific yet.
2. It would help if you (re)read the section about state persistence. [3] Basically, the state is updated on every write access of the process function. Flink creates a checkpoint of the state periodically and can recover from these checkpoint. It's also possible to look into these checkpoint with the state processor API [4].
3. It's embedded. See above to what happens on failure.


On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <[hidden email]> wrote:
Hi Arvid,

On 2 - I was referring to stateful functions as an alternative to windows, but in this particular use case, its not fitting in exactly I think, though a solution can be built around it.

On the overall approach here what's the right way to use Flink SQL:  
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction

Extending above..

For the session window, taking the above example , reiterated below:

Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

1 hour window:
9:30AM - 10:30 AM : Customer 2
10:30 AM - 11:30 AM : Customer 1, Customer 3
11:30 AM - 12:30 PM : Customer 3

Questions - how do we access the state?
  1. Will the process window function write to an in-memory SQL table that does not get flushed to a proper backing database, so all the data stays in-memory -  if yes can that be queried?
  2. If the process window function writes to a proper backing database, at what point should this happen? Because the API can query the state at any point of time, so the data that was flushed might be state and need recomputation.
  3. How do you recommend for rock db to be used as a state backend? Is that the embedded rocks db or do you recommend an external implementation. Embedded rocks db state is lost when the container is restarted i guess, so we will have to have an external mechanism for restart/ crash recovery?
Regards,
Swagat



On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <[hidden email]> wrote:
1. It always depends on the data volume per user. A million user is not much if you compare it to the biggest Flink installations (Netflix, Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd recommend to use rocksDB state backend. [1]

2. Are you referring to statefun? I'd say that for your use case, Flink is a better fit. Statefun is more suitable when each actor (=user in your case) acts differently depending on the data like in a state machine. In your case, your users should be processed in the same way: Even if the windows are independently opened and closed, every user has only at most one window open at a given event time. You probably also aggregate all user states more or less in the same way.

Or did you refer to processing functions with state? That's certainly possible to implement but it won't be much faster unless you can exploit some specific properties of your application. An example is written in [2]. I'd recommend to first use regular, built-in windows and only switch to custom code if the performance is insufficient. Custom implementations may be faster now, but come with a higher maintenance cost and the built-in windows may be better optimized in future.

Lastly if your query is of relational nature, I'd recommend to have a look at Table API/SQL [3]. Unless you really invest a lot of time, you won't be able to write more efficient code than what Table API is generating.


On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <[hidden email]> wrote:
 1. What if there are a very high number of users, like a million customers won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this approach differ from the one proposed below. 

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <[hidden email]> wrote:
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha
Reply | Threaded
Open this post in threaded view
|

Re: Flink Event specific window

Swagat Mishra
Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected ( to the user community )

All other details are here if you need, closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat

On Thu, May 6, 2021 at 1:50 AM Swagat Mishra <[hidden email]> wrote:
Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected

closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat


On Wed, May 5, 2021 at 7:24 PM Swagat Mishra <[hidden email]> wrote:
Hi Arvid,

Tried a small POC to reproduce the behaviour, somehow dont see the process function getting called, am I doing something wrong?

customerStream
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(8)))
.process(new CustomAggregateFunction())
.print();
the process function looks like below
public class CustomAggregateFunction extends ProcessWindowFunction<Customer, CustomerAggregate, String, TimeWindow> {

@Override
public void process(String key, Context context, Iterable<Customer> iterable, Collector<CustomerAggregate> collector) throws Exception {
System.out.println("in aggregation");
}
}
the customer generator
public class CustomerGenerator implements SourceFunction<Customer> {

volatile boolean isRunning = true;

private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};

@Override
public void run(SourceContext<Customer> sourceContext) throws Exception {
int counter = 1;

while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5], LocalTime.now(), 1000);
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
Thread.sleep(1000);
counter++;
}
}

@Override
public void cancel() {
isRunning = false;
}
}

Customer object
public class Customer {
private String identifier;
private LocalTime eventTime;
private double amount;

public Customer(String identifier, LocalTime eventTime, double amount) {
this.identifier = identifier;
this.amount = amount;
this.eventTime = eventTime;
}

public String getIdentifier() {
return identifier;
}

public LocalTime getEventTime() {
return eventTime;
}

public double getAmount() {
return amount;
}

@Override
public String toString() {
return "Customer{" +
"identifier='" + identifier + '\'' +
", eventTime=" + eventTime +
", amount=" + amount +
'}';
}
}


On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise <[hidden email]> wrote:
Hi Swagat,

1. Where the data primarily resides depends on the chosen state backend [1]. In most cases, it's written to some file with a memory cache. It's possible to query the state [2] but not with SQL. In fact, it's so basic that we decided to drop the feature in the future to make room for a more sophisticated solution based around replicating the state to an external queryable form but there is nothing specific yet.
2. It would help if you (re)read the section about state persistence. [3] Basically, the state is updated on every write access of the process function. Flink creates a checkpoint of the state periodically and can recover from these checkpoint. It's also possible to look into these checkpoint with the state processor API [4].
3. It's embedded. See above to what happens on failure.


On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra <[hidden email]> wrote:
Hi Arvid,

On 2 - I was referring to stateful functions as an alternative to windows, but in this particular use case, its not fitting in exactly I think, though a solution can be built around it.

On the overall approach here what's the right way to use Flink SQL:  
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction

Extending above..

For the session window, taking the above example , reiterated below:

Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

1 hour window:
9:30AM - 10:30 AM : Customer 2
10:30 AM - 11:30 AM : Customer 1, Customer 3
11:30 AM - 12:30 PM : Customer 3

Questions - how do we access the state?
  1. Will the process window function write to an in-memory SQL table that does not get flushed to a proper backing database, so all the data stays in-memory -  if yes can that be queried?
  2. If the process window function writes to a proper backing database, at what point should this happen? Because the API can query the state at any point of time, so the data that was flushed might be state and need recomputation.
  3. How do you recommend for rock db to be used as a state backend? Is that the embedded rocks db or do you recommend an external implementation. Embedded rocks db state is lost when the container is restarted i guess, so we will have to have an external mechanism for restart/ crash recovery?
Regards,
Swagat



On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise <[hidden email]> wrote:
1. It always depends on the data volume per user. A million user is not much if you compare it to the biggest Flink installations (Netflix, Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd recommend to use rocksDB state backend. [1]

2. Are you referring to statefun? I'd say that for your use case, Flink is a better fit. Statefun is more suitable when each actor (=user in your case) acts differently depending on the data like in a state machine. In your case, your users should be processed in the same way: Even if the windows are independently opened and closed, every user has only at most one window open at a given event time. You probably also aggregate all user states more or less in the same way.

Or did you refer to processing functions with state? That's certainly possible to implement but it won't be much faster unless you can exploit some specific properties of your application. An example is written in [2]. I'd recommend to first use regular, built-in windows and only switch to custom code if the performance is insufficient. Custom implementations may be faster now, but come with a higher maintenance cost and the built-in windows may be better optimized in future.

Lastly if your query is of relational nature, I'd recommend to have a look at Table API/SQL [3]. Unless you really invest a lot of time, you won't be able to write more efficient code than what Table API is generating.


On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra <[hidden email]> wrote:
 1. What if there are a very high number of users, like a million customers won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this approach differ from the one proposed below. 

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise <[hidden email]> wrote:
Hi Sunitha,

the approach you are describing sounds like you want to use a session window. [1] If you only want to count them if they happen at the same hour then, you want to use a tumbling window.

Your datastream approach looks solid.

For SQL, there is also a session (and tumbling) window [2]. You can see examples at the bottom of the section.


On Tue, Apr 20, 2021 at 11:03 PM [hidden email] <[hidden email]> wrote:
Hi All,

I have one requirement where I need to calculate total amount of transactions done by each each user in last 1 hour.
Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am 
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

when ever customer does a transaction then we receive an event in source topic, we consume the data and need to calculate the total amount spent by Customer in last 1 hour.

if I have received customer1 new transaction event at 11:30 am then I need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
Now say I receive Customer2 new transaction event at 11:30 am then for this customer I need to consider only one event 11:30 (ignoring the event at  10:00 am  as it does not fall in last 1 hr)
Customer3 new transaction is done at 12:40 pm then for this Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall under last 1 hr.

Approach I am planning to try:
Every event has the transaction time which I am using as event time to assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr 
then process all elements using ProcessWindowFunction


Kindly suggest the approach I need to follow to achieve the above scenario using Flink Java /Sql. I am using Flink 1.12.0.

Regards,
Sunitha