Caching

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Caching

Navneeth Krishnan
Hi All,

We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries.

In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. 

Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot.

Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution.

Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem?

Thanks,
Navneeth

Reply | Threaded
Open this post in threaded view
|

Re: Caching

Prasanna kumar
Navneeth, 

Thanks for posting this question.

This looks like our future scenario where we might end up with.

We are working on a Similar problem statement with two differences.

1) The cache items would not change frequently say max of once per month or few times per year and the number of entities in cache would not be more than 1000. (Say Java objects)

2) The Eventload we look at is around 10-50k/sec. 

We are using broadcast mechanism for the same. 

Prasanna.

On Thu 26 Nov, 2020, 14:01 Navneeth Krishnan, <[hidden email]> wrote:
Hi All,

We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries.

In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. 

Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot.

Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution.

Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem?

Thanks,
Navneeth

Reply | Threaded
Open this post in threaded view
|

Re: Caching

Dongwon Kim-2
In reply to this post by Navneeth Krishnan
Hi Navneeth,

I reported a similar issue to yours before [1] but I took the broadcasting approach at first.

As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM .

And the broadcasted data will be treated as operator state and will be periodically checkpointed with serialization overhead & garbage collections.
These are not negligible at all if you're not carefully choosing serialization strategy as explained in [2].
Even with the proper one, I've experienced mild back pressure whenever
- checkpoint is in progress (AFAIK, incremental checkpoint has nothing to do with operator states)
- cache is being broadcasted

For that reason, I decided to populate data on Redis but it also calls for design decisions:
- which Java client to use? Jedis [3]? Lettuce [4]?
- how to invoke APIs calls inside Flink? synchronously or asynchronously?

Currently I'm very satisfied with Lettuce with Flink's async io [5] with very small memory footprint and without worrying about serialization overhead and garbage collections.
Lettuce supports asynchronous communication so it works perfectly with Flink's async io.
I bet you'll be very disappointed with invoking Jedis synchronously inside ProcessFunction.

Best,

Dongwon


On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries.

In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. 

Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot.

Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution.

Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem?

Thanks,
Navneeth

Reply | Threaded
Open this post in threaded view
|

Re: Caching

Dongwon Kim-2
Oops, I forgot to mention that when doing bulk insert into Redis, you'd better open a pipeline with a 'transaction' property set to False [1].

Otherwise, API calls from your Flink job will be timeout.


On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Navneeth,

I reported a similar issue to yours before [1] but I took the broadcasting approach at first.

As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM .

And the broadcasted data will be treated as operator state and will be periodically checkpointed with serialization overhead & garbage collections.
These are not negligible at all if you're not carefully choosing serialization strategy as explained in [2].
Even with the proper one, I've experienced mild back pressure whenever
- checkpoint is in progress (AFAIK, incremental checkpoint has nothing to do with operator states)
- cache is being broadcasted

For that reason, I decided to populate data on Redis but it also calls for design decisions:
- which Java client to use? Jedis [3]? Lettuce [4]?
- how to invoke APIs calls inside Flink? synchronously or asynchronously?

Currently I'm very satisfied with Lettuce with Flink's async io [5] with very small memory footprint and without worrying about serialization overhead and garbage collections.
Lettuce supports asynchronous communication so it works perfectly with Flink's async io.
I bet you'll be very disappointed with invoking Jedis synchronously inside ProcessFunction.

Best,

Dongwon


On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries.

In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. 

Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot.

Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution.

Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem?

Thanks,
Navneeth

Reply | Threaded
Open this post in threaded view
|

Re: Caching

Navneeth Krishnan
Thanks Dongwon. It was extremely helpful. I didn't quite understand how async io can be used here. It would be great if you can share some info on it.

Also how are you propagating any changes to values?

Regards,
Navneeth

On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim <[hidden email]> wrote:
Oops, I forgot to mention that when doing bulk insert into Redis, you'd better open a pipeline with a 'transaction' property set to False [1].

Otherwise, API calls from your Flink job will be timeout.


On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Navneeth,

I reported a similar issue to yours before [1] but I took the broadcasting approach at first.

As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM .

And the broadcasted data will be treated as operator state and will be periodically checkpointed with serialization overhead & garbage collections.
These are not negligible at all if you're not carefully choosing serialization strategy as explained in [2].
Even with the proper one, I've experienced mild back pressure whenever
- checkpoint is in progress (AFAIK, incremental checkpoint has nothing to do with operator states)
- cache is being broadcasted

For that reason, I decided to populate data on Redis but it also calls for design decisions:
- which Java client to use? Jedis [3]? Lettuce [4]?
- how to invoke APIs calls inside Flink? synchronously or asynchronously?

Currently I'm very satisfied with Lettuce with Flink's async io [5] with very small memory footprint and without worrying about serialization overhead and garbage collections.
Lettuce supports asynchronous communication so it works perfectly with Flink's async io.
I bet you'll be very disappointed with invoking Jedis synchronously inside ProcessFunction.

Best,

Dongwon


On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries.

In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. 

Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot.

Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution.

Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem?

Thanks,
Navneeth

Reply | Threaded
Open this post in threaded view
|

Re: Caching

Dongwon Kim-2
Hi Navneeth,

I didn't quite understand how async io can be used here. It would be great if you can share some info on it.
You need to add an async operator in the middle of your pipeline in order to enrich your input data. [1] and [2] will help you.

Also how are you propagating any changes to values?
I need to maintain the mapping of road ID to various attributes of each road, and the mapping is updated every week.
I use keys for versioning and I use Hash [3] for value in order to store a mapping.
When a new mapping is prepared I'm uploading it using a fresh key while the previous version is being served to Flink (via async io).
Such concurrent read/write is possible in Redis when you turn off transaction when creating Redis client's pipeline.
When the new mapping is completely uploaded, I inform my Flink pipeline of the new mapping via Kafka.


Best,

Dongwon

On Fri, Nov 27, 2020 at 4:31 PM Navneeth Krishnan <[hidden email]> wrote:
Thanks Dongwon. It was extremely helpful. I didn't quite understand how async io can be used here. It would be great if you can share some info on it.

Also how are you propagating any changes to values?

Regards,
Navneeth

On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim <[hidden email]> wrote:
Oops, I forgot to mention that when doing bulk insert into Redis, you'd better open a pipeline with a 'transaction' property set to False [1].

Otherwise, API calls from your Flink job will be timeout.


On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Navneeth,

I reported a similar issue to yours before [1] but I took the broadcasting approach at first.

As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM .

And the broadcasted data will be treated as operator state and will be periodically checkpointed with serialization overhead & garbage collections.
These are not negligible at all if you're not carefully choosing serialization strategy as explained in [2].
Even with the proper one, I've experienced mild back pressure whenever
- checkpoint is in progress (AFAIK, incremental checkpoint has nothing to do with operator states)
- cache is being broadcasted

For that reason, I decided to populate data on Redis but it also calls for design decisions:
- which Java client to use? Jedis [3]? Lettuce [4]?
- how to invoke APIs calls inside Flink? synchronously or asynchronously?

Currently I'm very satisfied with Lettuce with Flink's async io [5] with very small memory footprint and without worrying about serialization overhead and garbage collections.
Lettuce supports asynchronous communication so it works perfectly with Flink's async io.
I bet you'll be very disappointed with invoking Jedis synchronously inside ProcessFunction.

Best,

Dongwon


On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <[hidden email]> wrote:
Hi All,

We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries.

In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. 

Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot.

Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution.

Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem?

Thanks,
Navneeth