Implement Joins with Lookup Data

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Implement Joins with Lookup Data

Harshvardhan Agrawal
Hi,

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

1) First Approach:
a) Get positions from Kafka and key by product key.
b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

2) Second Approach:
a) Get positions from Kafka and key by product key.
b) Window the keyed stream into say 15 seconds each.
c) For each window get the unique product keys and perform a single lookup.
d) Somehow join Positions and Products

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

--
Regards,
Harshvardhan Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Jörn Franke
For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale.

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 

On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <[hidden email]> wrote:

Hi,

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

1) First Approach:
a) Get positions from Kafka and key by product key.
b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

2) Second Approach:
a) Get positions from Kafka and key by product key.
b) Window the keyed stream into say 15 seconds each.
c) For each window get the unique product keys and perform a single lookup.
d) Somehow join Positions and Products

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

--
Regards,
Harshvardhan Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Jain, Ankit

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Harshvardhan Agrawal
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Till Rohrmann
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Harshvardhan Agrawal
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Till Rohrmann
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Elias Levy
Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Ashish Pokharel
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Harshvardhan Agrawal
What happens when one of your workers dies? Say the machine is dead is not recoverable. How do you recover from that situation? Will the pipeline die and you go over the entire bootstrap process?

On Tue, Jul 24, 2018 at 11:56 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Ashish Pokharel
App is checkpointing, so will pick up if an operation fails. I suppose you mean a TM completely crashes and even in that case another TM would spin up and it “should” pick up from checkpoint. We are running YARN but I would assume TM recovery would be possible in any other cluster. I havent tested this specifically during init phase but we have killed TMs during normal processing as test case in stateful processing and dont remember seeing an issue.


- Ashish

On Tuesday, July 24, 2018, 12:31 PM, Harshvardhan Agrawal <[hidden email]> wrote:

What happens when one of your workers dies? Say the machine is dead is not recoverable. How do you recover from that situation? Will the pipeline die and you go over the entire bootstrap process?

On Tue, Jul 24, 2018 at 11:56 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Michael Gendelman
In reply to this post by Ashish Pokharel
Hi Ashish,

We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Ashish Pokharel
Hi Michael,

We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be able to reduce resources on this if need be. 

Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <[hidden email]> wrote:

Hi Ashish,

We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Harshvardhan Agrawal
Hello Everyone,

Sorry for the delayed response.
This is what I am thinking of doing. We are thinking of creating 2 pipelines. The first one only enriches the position data with product information. The second pipeline will use the enriched position and get all the account information for performing aggregations.

First Pipeline:
1) Get the positions from Kafka and window data into tumbling windows of 30 seconds.
2) We perform a rolling aggregation that basically collects all the unique product keys in a set.
3) At the end of the window, we have a process function that queries an external service that performs a single lookup for all the unique products we have seen in the window.
4) Persist the enriched positions to Kafka topic T1. There is a sink process that reads from this Kafka topic (T1), writes to an underlying DB and persist to another Kafka topic (T2)  for the pipeline to read from.

Second Pipeline
1) Reads from topic T2 that contains enriched position.
2) For each position, we get the account information and lookup all the parent and child accounts associated with that account.
3) Once we have all the accounts, we lookup all the enriched positions that were created from the first pipeline for those accounts.
4) We perform the final aggregation to say calculate the Net Asset Value for the account.
5) Persist the output to the DB.

Regards,
Harsh

On Wed, Jul 25, 2018 at 6:52 PM ashish pok <[hidden email]> wrote:
Hi Michael,

We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be able to reduce resources on this if need be. 

Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <[hidden email]> wrote:

Hi Ashish,

We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan


Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Hequn Cheng
Hi Harshvardhan,

Have you ever consider adding a cache when lookup from the database, so that we don't have to add so many pipelines, also don't have to do window distinct.
The cache can be a LRU cache with size and expire time specified. 
If your data is limited it can also be an All data cache. The All data cache can be updated, say each 2h, according to our requirement.

Adding a cache can not only simplify your pipeline but also improve the job performance.

Best, Hequn


On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal <[hidden email]> wrote:
Hello Everyone,

Sorry for the delayed response.
This is what I am thinking of doing. We are thinking of creating 2 pipelines. The first one only enriches the position data with product information. The second pipeline will use the enriched position and get all the account information for performing aggregations.

First Pipeline:
1) Get the positions from Kafka and window data into tumbling windows of 30 seconds.
2) We perform a rolling aggregation that basically collects all the unique product keys in a set.
3) At the end of the window, we have a process function that queries an external service that performs a single lookup for all the unique products we have seen in the window.
4) Persist the enriched positions to Kafka topic T1. There is a sink process that reads from this Kafka topic (T1), writes to an underlying DB and persist to another Kafka topic (T2)  for the pipeline to read from.

Second Pipeline
1) Reads from topic T2 that contains enriched position.
2) For each position, we get the account information and lookup all the parent and child accounts associated with that account.
3) Once we have all the accounts, we lookup all the enriched positions that were created from the first pipeline for those accounts.
4) We perform the final aggregation to say calculate the Net Asset Value for the account.
5) Persist the output to the DB.

Regards,
Harsh

On Wed, Jul 25, 2018 at 6:52 PM ashish pok <[hidden email]> wrote:
Hi Michael,

We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be able to reduce resources on this if need be. 

Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <[hidden email]> wrote:

Hi Ashish,

We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan



Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Harshvardhan Agrawal
Hi Hequn,

We considered that but unfortunately we have a lot of reference data and we would need enormous amount of memory to hold the data. As a proof of concept I had added a Guava cache and that did improve performance but then it can't hold all of our reference data. We have a lot of use cases where we want to join position data with Account, Product, Exchange Rate, etc. The joins can easily be across several datasets in order to obtain the final enriched information.
Now if I were to keep an external cache say something like Ignite, I would need the some service that constantly keeps hitting the cache for every position which makes the pipeline super chatty. Hence we thought going with the windowing approach would help us control that chattiness.

I like Till's solution of connecting streams and using CoFlatMap. I can also see an example on Data Artisan's website (http://training.data-artisans.com/exercises/eventTimeJoin.html#). What I don't get is, how would this work when I have more than 2 datasets involved. In my case say I wanted to enrich Positions using Account, Product and Exchange Rate datasets.

Regards,
Harsh

On Sun, Aug 19, 2018 at 10:22 PM Hequn Cheng <[hidden email]> wrote:
Hi Harshvardhan,

Have you ever consider adding a cache when lookup from the database, so that we don't have to add so many pipelines, also don't have to do window distinct.
The cache can be a LRU cache with size and expire time specified. 
If your data is limited it can also be an All data cache. The All data cache can be updated, say each 2h, according to our requirement.

Adding a cache can not only simplify your pipeline but also improve the job performance.

Best, Hequn


On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal <[hidden email]> wrote:
Hello Everyone,

Sorry for the delayed response.
This is what I am thinking of doing. We are thinking of creating 2 pipelines. The first one only enriches the position data with product information. The second pipeline will use the enriched position and get all the account information for performing aggregations.

First Pipeline:
1) Get the positions from Kafka and window data into tumbling windows of 30 seconds.
2) We perform a rolling aggregation that basically collects all the unique product keys in a set.
3) At the end of the window, we have a process function that queries an external service that performs a single lookup for all the unique products we have seen in the window.
4) Persist the enriched positions to Kafka topic T1. There is a sink process that reads from this Kafka topic (T1), writes to an underlying DB and persist to another Kafka topic (T2)  for the pipeline to read from.

Second Pipeline
1) Reads from topic T2 that contains enriched position.
2) For each position, we get the account information and lookup all the parent and child accounts associated with that account.
3) Once we have all the accounts, we lookup all the enriched positions that were created from the first pipeline for those accounts.
4) We perform the final aggregation to say calculate the Net Asset Value for the account.
5) Persist the output to the DB.

Regards,
Harsh

On Wed, Jul 25, 2018 at 6:52 PM ashish pok <[hidden email]> wrote:
Hi Michael,

We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be able to reduce resources on this if need be. 

Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <[hidden email]> wrote:

Hi Ashish,

We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan





--
Regards,
Harshvardhan Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Hequn Cheng
Hi Harsh,

> What I don't get is, how would this work when I have more than 2 datasets involved?
If you can ingest the product/account/rate information changes as a stream, I think there are two ways to enrich the positions.
  • One way is connect multi times. Positions connect Account connect Product connect Rate.
  • Also, we can unify the schema and union Account,Product and Rate before connect. Positions connect (Account union Product union Rate). 
Best, Hequn

On Thu, Aug 23, 2018 at 9:46 AM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Hequn,

We considered that but unfortunately we have a lot of reference data and we would need enormous amount of memory to hold the data. As a proof of concept I had added a Guava cache and that did improve performance but then it can't hold all of our reference data. We have a lot of use cases where we want to join position data with Account, Product, Exchange Rate, etc. The joins can easily be across several datasets in order to obtain the final enriched information.
Now if I were to keep an external cache say something like Ignite, I would need the some service that constantly keeps hitting the cache for every position which makes the pipeline super chatty. Hence we thought going with the windowing approach would help us control that chattiness.

I like Till's solution of connecting streams and using CoFlatMap. I can also see an example on Data Artisan's website (http://training.data-artisans.com/exercises/eventTimeJoin.html#). What I don't get is, how would this work when I have more than 2 datasets involved. In my case say I wanted to enrich Positions using Account, Product and Exchange Rate datasets.

Regards,
Harsh

On Sun, Aug 19, 2018 at 10:22 PM Hequn Cheng <[hidden email]> wrote:
Hi Harshvardhan,

Have you ever consider adding a cache when lookup from the database, so that we don't have to add so many pipelines, also don't have to do window distinct.
The cache can be a LRU cache with size and expire time specified. 
If your data is limited it can also be an All data cache. The All data cache can be updated, say each 2h, according to our requirement.

Adding a cache can not only simplify your pipeline but also improve the job performance.

Best, Hequn


On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal <[hidden email]> wrote:
Hello Everyone,

Sorry for the delayed response.
This is what I am thinking of doing. We are thinking of creating 2 pipelines. The first one only enriches the position data with product information. The second pipeline will use the enriched position and get all the account information for performing aggregations.

First Pipeline:
1) Get the positions from Kafka and window data into tumbling windows of 30 seconds.
2) We perform a rolling aggregation that basically collects all the unique product keys in a set.
3) At the end of the window, we have a process function that queries an external service that performs a single lookup for all the unique products we have seen in the window.
4) Persist the enriched positions to Kafka topic T1. There is a sink process that reads from this Kafka topic (T1), writes to an underlying DB and persist to another Kafka topic (T2)  for the pipeline to read from.

Second Pipeline
1) Reads from topic T2 that contains enriched position.
2) For each position, we get the account information and lookup all the parent and child accounts associated with that account.
3) Once we have all the accounts, we lookup all the enriched positions that were created from the first pipeline for those accounts.
4) We perform the final aggregation to say calculate the Net Asset Value for the account.
5) Persist the output to the DB.

Regards,
Harsh

On Wed, Jul 25, 2018 at 6:52 PM ashish pok <[hidden email]> wrote:
Hi Michael,

We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be able to reduce resources on this if need be. 

Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <[hidden email]> wrote:

Hi Ashish,

We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan





--
Regards,
Harshvardhan Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: Implement Joins with Lookup Data

Till Rohrmann
Hequn is right. If you know the maximum delay of your position corrections, then you need to buffer the enrichment information for so long.

Cheers,
Till

On Thu, Aug 23, 2018 at 9:04 AM Hequn Cheng <[hidden email]> wrote:
Hi Harsh,

> What I don't get is, how would this work when I have more than 2 datasets involved?
If you can ingest the product/account/rate information changes as a stream, I think there are two ways to enrich the positions.
  • One way is connect multi times. Positions connect Account connect Product connect Rate.
  • Also, we can unify the schema and union Account,Product and Rate before connect. Positions connect (Account union Product union Rate). 
Best, Hequn

On Thu, Aug 23, 2018 at 9:46 AM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Hequn,

We considered that but unfortunately we have a lot of reference data and we would need enormous amount of memory to hold the data. As a proof of concept I had added a Guava cache and that did improve performance but then it can't hold all of our reference data. We have a lot of use cases where we want to join position data with Account, Product, Exchange Rate, etc. The joins can easily be across several datasets in order to obtain the final enriched information.
Now if I were to keep an external cache say something like Ignite, I would need the some service that constantly keeps hitting the cache for every position which makes the pipeline super chatty. Hence we thought going with the windowing approach would help us control that chattiness.

I like Till's solution of connecting streams and using CoFlatMap. I can also see an example on Data Artisan's website (http://training.data-artisans.com/exercises/eventTimeJoin.html#). What I don't get is, how would this work when I have more than 2 datasets involved. In my case say I wanted to enrich Positions using Account, Product and Exchange Rate datasets.

Regards,
Harsh

On Sun, Aug 19, 2018 at 10:22 PM Hequn Cheng <[hidden email]> wrote:
Hi Harshvardhan,

Have you ever consider adding a cache when lookup from the database, so that we don't have to add so many pipelines, also don't have to do window distinct.
The cache can be a LRU cache with size and expire time specified. 
If your data is limited it can also be an All data cache. The All data cache can be updated, say each 2h, according to our requirement.

Adding a cache can not only simplify your pipeline but also improve the job performance.

Best, Hequn


On Mon, Aug 20, 2018 at 5:42 AM, Harshvardhan Agrawal <[hidden email]> wrote:
Hello Everyone,

Sorry for the delayed response.
This is what I am thinking of doing. We are thinking of creating 2 pipelines. The first one only enriches the position data with product information. The second pipeline will use the enriched position and get all the account information for performing aggregations.

First Pipeline:
1) Get the positions from Kafka and window data into tumbling windows of 30 seconds.
2) We perform a rolling aggregation that basically collects all the unique product keys in a set.
3) At the end of the window, we have a process function that queries an external service that performs a single lookup for all the unique products we have seen in the window.
4) Persist the enriched positions to Kafka topic T1. There is a sink process that reads from this Kafka topic (T1), writes to an underlying DB and persist to another Kafka topic (T2)  for the pipeline to read from.

Second Pipeline
1) Reads from topic T2 that contains enriched position.
2) For each position, we get the account information and lookup all the parent and child accounts associated with that account.
3) Once we have all the accounts, we lookup all the enriched positions that were created from the first pipeline for those accounts.
4) We perform the final aggregation to say calculate the Net Asset Value for the account.
5) Persist the output to the DB.

Regards,
Harsh

On Wed, Jul 25, 2018 at 6:52 PM ashish pok <[hidden email]> wrote:
Hi Michael,

We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be able to reduce resources on this if need be. 

Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman <[hidden email]> wrote:

Hi Ashish,

We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok <[hidden email]> wrote:
BTW, 

We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states.

Hope this helps!



- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <[hidden email]> wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing.  Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <[hidden email]> wrote:
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi Till,

How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? 

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <[hidden email]> wrote:
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event.


Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. 

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <[hidden email]> wrote:

How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <[hidden email]>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <[hidden email]>
Cc: <[hidden email]>
Subject: Re: Implement Joins with Lookup Data

 

 

Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
[hidden email]> wrote:

Hi,

 

We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join?

 

--

Regards,
Harshvardhan Agrawal

--
Regards,
Harshvardhan
--
Regards,
Harshvardhan





--
Regards,
Harshvardhan Agrawal