Using redis cache in flink

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Using redis cache in flink

Navneeth Krishnan
Hi All,

I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using redis cache in flink

Yun Tang
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you can share data across slots easily. However, compared with the reduced cost of serialization, the introduce of network communicate cannot be ignored. There exists trade-off here, and we cannot ensure there would be a performance gain. Actually, I prefer the time used in CPU serialization is much less than the time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 12:33
To: user <[hidden email]>
Subject: Using redis cache in flink
 
Hi All,

I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using redis cache in flink

Navneeth Krishnan
Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I would still have rocksdb state backend for other states. The reason to use cache instead of managed state is because I’d get around 10k msgs per task slot and I don’t have to get the state from rocksdb for each lookup. In memory cache would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang <[hidden email]> wrote:
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you can share data across slots easily. However, compared with the reduced cost of serialization, the introduce of network communicate cannot be ignored. There exists trade-off here, and we cannot ensure there would be a performance gain. Actually, I prefer the time used in CPU serialization is much less than the time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 12:33
To: user <[hidden email]>
Subject: Using redis cache in flink
 
Hi All,

I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using redis cache in flink

Yun Tang
Hi Navneeth

If you need the redis cache to be fault tolerant, I am afraid you have to choose redis cluster since Flink might deploy task on another node which is different from previous node after job failover.

If you don't care about the fault tolerance, you could implement a customized operator which launch redis.

By the way, there existed a way to combine objects on heap in memory with checkpoint mechanism to ensure fault tolerance, you could refer to [1] and [2]. The basic idea is to cac




From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 15:36
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Using redis cache in flink
 
Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I would still have rocksdb state backend for other states. The reason to use cache instead of managed state is because I’d get around 10k msgs per task slot and I don’t have to get the state from rocksdb for each lookup. In memory cache would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang <[hidden email]> wrote:
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you can share data across slots easily. However, compared with the reduced cost of serialization, the introduce of network communicate cannot be ignored. There exists trade-off here, and we cannot ensure there would be a performance gain. Actually, I prefer the time used in CPU serialization is much less than the time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 12:33
To: user <[hidden email]>
Subject: Using redis cache in flink
 
Hi All,

I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using redis cache in flink

Yun Tang
In reply to this post by Navneeth Krishnan
Hi Navneeth

If you need the redis cache to be fault tolerant, I am afraid you have to choose redis cluster since Flink might deploy task on another node which is different from previous node after job failover.

If you don't care about the fault tolerance, you could implement a customized operator which launch redis.

By the way, there existed a way to combine objects on heap in memory with checkpoint mechanism to ensure fault tolerance, you could refer to [1] and [2]. The basic idea is to cache data in memory for buffer and ensure stored in each checkpoints.


Best
Yun Tang


From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 15:36
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Using redis cache in flink
 
Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I would still have rocksdb state backend for other states. The reason to use cache instead of managed state is because I’d get around 10k msgs per task slot and I don’t have to get the state from rocksdb for each lookup. In memory cache would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang <[hidden email]> wrote:
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you can share data across slots easily. However, compared with the reduced cost of serialization, the introduce of network communicate cannot be ignored. There exists trade-off here, and we cannot ensure there would be a performance gain. Actually, I prefer the time used in CPU serialization is much less than the time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 12:33
To: user <[hidden email]>
Subject: Using redis cache in flink
 
Hi All,

I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using redis cache in flink

Navneeth Krishnan
In reply to this post by Yun Tang
Hi Yun,

Thanks for the update. I can definitely use a redis cluster but what I don't understand is if I use a custom operator then redis cache will instantiated per operator instance. What I would like to ideally have is one redis cache instance per TM JVM. Since there isn't anyway to share data between task slots today in flink, I would like to use this approach to basically share common data. What I'm not sure is how can I ensure just one cache instance per TM JVM is created?

Regards

On Wed, Jan 8, 2020 at 12:46 AM Yun Tang <[hidden email]> wrote:
Hi Navneeth

If you need the redis cache to be fault tolerant, I am afraid you have to choose redis cluster since Flink might deploy task on another node which is different from previous node after job failover.

If you don't care about the fault tolerance, you could implement a customized operator which launch redis.

By the way, there existed a way to combine objects on heap in memory with checkpoint mechanism to ensure fault tolerance, you could refer to [1] and [2]. The basic idea is to cac




From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 15:36
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Using redis cache in flink
 
Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I would still have rocksdb state backend for other states. The reason to use cache instead of managed state is because I’d get around 10k msgs per task slot and I don’t have to get the state from rocksdb for each lookup. In memory cache would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang <[hidden email]> wrote:
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you can share data across slots easily. However, compared with the reduced cost of serialization, the introduce of network communicate cannot be ignored. There exists trade-off here, and we cannot ensure there would be a performance gain. Actually, I prefer the time used in CPU serialization is much less than the time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 12:33
To: user <[hidden email]>
Subject: Using redis cache in flink
 
Hi All,

I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Using redis cache in flink

Yun Tang
Hi Navneeth

You cannot easily create single specific instance per TM since Flink would not allow user defined object binned with the life cycle of task manager. However, you can ensure all the operators of the same class could share some single object when initializing operators. You could use static variable with atomic reference or synchronization when calling RichFunction#open to initialize and remember to release resources when calling RichFunction#close .

Best
Yun Tang

From: Navneeth Krishnan <[hidden email]>
Sent: Monday, January 13, 2020 11:22
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Using redis cache in flink
 
Hi Yun,

Thanks for the update. I can definitely use a redis cluster but what I don't understand is if I use a custom operator then redis cache will instantiated per operator instance. What I would like to ideally have is one redis cache instance per TM JVM. Since there isn't anyway to share data between task slots today in flink, I would like to use this approach to basically share common data. What I'm not sure is how can I ensure just one cache instance per TM JVM is created?

Regards

On Wed, Jan 8, 2020 at 12:46 AM Yun Tang <[hidden email]> wrote:
Hi Navneeth

If you need the redis cache to be fault tolerant, I am afraid you have to choose redis cluster since Flink might deploy task on another node which is different from previous node after job failover.

If you don't care about the fault tolerance, you could implement a customized operator which launch redis.

By the way, there existed a way to combine objects on heap in memory with checkpoint mechanism to ensure fault tolerance, you could refer to [1] and [2]. The basic idea is to cac




From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 15:36
To: Yun Tang <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Using redis cache in flink
 
Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I would still have rocksdb state backend for other states. The reason to use cache instead of managed state is because I’d get around 10k msgs per task slot and I don’t have to get the state from rocksdb for each lookup. In memory cache would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang <[hidden email]> wrote:
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you can share data across slots easily. However, compared with the reduced cost of serialization, the introduce of network communicate cannot be ignored. There exists trade-off here, and we cannot ensure there would be a performance gain. Actually, I prefer the time used in CPU serialization is much less than the time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan <[hidden email]>
Sent: Wednesday, January 8, 2020 12:33
To: user <[hidden email]>
Subject: Using redis cache in flink
 
Hi All,

I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager.

Thanks