Appending Windowed Aggregates to Events

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Appending Windowed Aggregates to Events

Tim Stearn

Hello All,

 

I’m *very* new to Flink.  I read through the documentation and played with some sample code, but I’m struggling to get started with my requirements.

 

We want to use Flink to maintain windowed aggregates as part of a transaction monitoring application.  These would use sliding window definitions.  An example would be:  “Total amount for CASH transactions in the last 5 days”.   Here’s what I need my Flink application to do:

1.      Prepare for transaction processing by reading historical aggregates and building windows

2.      For each new transaction:

a.      Update the windowed aggregate with the new transaction data

b.      Find the window that matches the incoming time stamp and add the aggregate value to the transaction

c.      Send enhanced transaction (original fields + aggregates from matching window) to downstream processor via RabbitMQ or Kafka sink

 

For every transaction coming in, I want one (and only one) output that contains the original transaction fields plus the aggregates.

 

I see how to do the code to create the window assigner and the code that incrementally maintains the aggregates.  I’m not sure how I could join this back to the original transaction record, appending the aggregate values from the window that matches the transaction date stamp.  This seems like a join of some kind to me, but I don’t know how to implement in in Flink.

 

I’m hoping someone could reply with some simple code (or even pseudo code) to get me started on the “join”  part of the above data flow.  Please let me know if I need to clarify.

 

Thanks,

 

Tim Stearn

 

Reply | Threaded
Open this post in threaded view
|

Re: Appending Windowed Aggregates to Events

Fabian Hueske-2
Hi Tim,

your requirements seem to be similar to what you can do in SQL with an OVER window aggregation.
OVER window aggregates are computed for each row and enrich the existing fields with aggregates. The aggregates are computed over a range of rows that precede (or follow) the current row. This means that each row possibly has a different aggregate.

Flink's DataStream API has no built-in support for these windows and you would need to implement the functionality yourself based on a ProcessFunction. This is definitely possible, but a bit involved if you want to make it efficient.
Flink's Table API and SQL integration has support for OVER windows (Table API [1], SQL [2]).

Hope this helps.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/tableApi.html#over-windows
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#aggregations


2017-06-24 0:59 GMT+02:00 Tim Stearn <[hidden email]>:

Hello All,

 

I’m *very* new to Flink.  I read through the documentation and played with some sample code, but I’m struggling to get started with my requirements.

 

We want to use Flink to maintain windowed aggregates as part of a transaction monitoring application.  These would use sliding window definitions.  An example would be:  “Total amount for CASH transactions in the last 5 days”.   Here’s what I need my Flink application to do:

1.      Prepare for transaction processing by reading historical aggregates and building windows

2.      For each new transaction:

a.      Update the windowed aggregate with the new transaction data

b.      Find the window that matches the incoming time stamp and add the aggregate value to the transaction

c.      Send enhanced transaction (original fields + aggregates from matching window) to downstream processor via RabbitMQ or Kafka sink

 

For every transaction coming in, I want one (and only one) output that contains the original transaction fields plus the aggregates.

 

I see how to do the code to create the window assigner and the code that incrementally maintains the aggregates.  I’m not sure how I could join this back to the original transaction record, appending the aggregate values from the window that matches the transaction date stamp.  This seems like a join of some kind to me, but I don’t know how to implement in in Flink.

 

I’m hoping someone could reply with some simple code (or even pseudo code) to get me started on the “join”  part of the above data flow.  Please let me know if I need to clarify.

 

Thanks,

 

Tim Stearn

 


Reply | Threaded
Open this post in threaded view
|

Re: Appending Windowed Aggregates to Events

Jain, Ankit
In reply to this post by Tim Stearn

You could load the historical data as flink state and then look up the state with the key derived from input record.

That should serve like a join in relational world.

 

You may also want to think about keeping the writes and querying isolated.

Especially if your windows are going to be long (eg cash transactions for last 6 months in your example) and you need your data to be persistent long term, having a durable store outside of Flink will really help.

 

Flink state feature is really nice but I wouldn’t view it as a long term durable storage like a no-sql store or a relational db like oracle.

 

Thanks

Ankit

 

From: Tim Stearn <[hidden email]>
Date: Friday, June 23, 2017 at 3:59 PM
To: "[hidden email]" <[hidden email]>
Subject: Appending Windowed Aggregates to Events

 

Hello All,

 

I’m *very* new to Flink.  I read through the documentation and played with some sample code, but I’m struggling to get started with my requirements.

 

We want to use Flink to maintain windowed aggregates as part of a transaction monitoring application.  These would use sliding window definitions.  An example would be:  “Total amount for CASH transactions in the last 5 days”.   Here’s what I need my Flink application to do:

1.       Prepare for transaction processing by reading historical aggregates and building windows

2.       For each new transaction:

a.       Update the windowed aggregate with the new transaction data

b.       Find the window that matches the incoming time stamp and add the aggregate value to the transaction

c.       Send enhanced transaction (original fields + aggregates from matching window) to downstream processor via RabbitMQ or Kafka sink

 

For every transaction coming in, I want one (and only one) output that contains the original transaction fields plus the aggregates.

 

I see how to do the code to create the window assigner and the code that incrementally maintains the aggregates.  I’m not sure how I could join this back to the original transaction record, appending the aggregate values from the window that matches the transaction date stamp.  This seems like a join of some kind to me, but I don’t know how to implement in in Flink.

 

I’m hoping someone could reply with some simple code (or even pseudo code) to get me started on the “join”  part of the above data flow.  Please let me know if I need to clarify.

 

Thanks,

 

Tim Stearn

 

Reply | Threaded
Open this post in threaded view
|

Re: Appending Windowed Aggregates to Events

Ted Yu
In reply to this post by Tim Stearn
In case no-sql store is considered, please take a look at base

Cheers

-------- Original message --------
From: "Jain, Ankit" <[hidden email]>
Date: 6/26/17 12:41 PM (GMT-08:00)
To: Tim Stearn <[hidden email]>, [hidden email]
Subject: Re: Appending Windowed Aggregates to Events

You could load the historical data as flink state and then look up the state with the key derived from input record.

That should serve like a join in relational world.

 

You may also want to think about keeping the writes and querying isolated.

Especially if your windows are going to be long (eg cash transactions for last 6 months in your example) and you need your data to be persistent long term, having a durable store outside of Flink will really help.

 

Flink state feature is really nice but I wouldn’t view it as a long term durable storage like a no-sql store or a relational db like oracle.

 

Thanks

Ankit

 

From: Tim Stearn <[hidden email]>
Date: Friday, June 23, 2017 at 3:59 PM
To: "[hidden email]" <[hidden email]>
Subject: Appending Windowed Aggregates to Events

 

Hello All,

 

I’m *very* new to Flink.  I read through the documentation and played with some sample code, but I’m struggling to get started with my requirements.

 

We want to use Flink to maintain windowed aggregates as part of a transaction monitoring application.  These would use sliding window definitions.  An example would be:  “Total amount for CASH transactions in the last 5 days”.   Here’s what I need my Flink application to do:

1.       Prepare for transaction processing by reading historical aggregates and building windows

2.       For each new transaction:

a.       Update the windowed aggregate with the new transaction data

b.       Find the window that matches the incoming time stamp and add the aggregate value to the transaction

c.       Send enhanced transaction (original fields + aggregates from matching window) to downstream processor via RabbitMQ or Kafka sink

 

For every transaction coming in, I want one (and only one) output that contains the original transaction fields plus the aggregates.

 

I see how to do the code to create the window assigner and the code that incrementally maintains the aggregates.  I’m not sure how I could join this back to the original transaction record, appending the aggregate values from the window that matches the transaction date stamp.  This seems like a join of some kind to me, but I don’t know how to implement in in Flink.

 

I’m hoping someone could reply with some simple code (or even pseudo code) to get me started on the “join”  part of the above data flow.  Please let me know if I need to clarify.

 

Thanks,

 

Tim Stearn