Caching Mechanism in Flink

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Caching Mechanism in Flink

Jack Kolokasis
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos

Reply | Threaded
Open this post in threaded view
|

Re: Caching Mechanism in Flink

Xuannan Su
Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <[hidden email]>, wrote:
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos

Reply | Threaded
Open this post in threaded view
|

Re: Caching Mechanism in Flink

Jack Kolokasis

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:
Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <[hidden email]>, wrote:
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos

Reply | Threaded
Open this post in threaded view
|

Re: Caching Mechanism in Flink

Matthias
Hi Iacovos,
The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I hope that helps.

Best,
Matthias


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <[hidden email]> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:
Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <[hidden email]>, wrote:
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos
Reply | Threaded
Open this post in threaded view
|

Re: Caching Mechanism in Flink

Jack Kolokasis

Hi Matthias,

Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to use these HybridMemorySegments and in which operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
Hi Iacovos,
The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I hope that helps.

Best,
Matthias


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <[hidden email]> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:
Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <[hidden email]>, wrote:
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos
Reply | Threaded
Open this post in threaded view
|

Re: Caching Mechanism in Flink

Matthias
When talking about the "off-heap" in your most recent message, are you still referring to the task's off-heap configuration value? AFAIK, the HybridMemorySegment shouldn't be directly related to the off-heap parameter.

The HybridMemorySegment can be used as a wrapper around any kind of memory, i.e. byte[]. It can be either used for heap memory but also DirectByteBuffers (located in JVM's direct memory pool which is not part of the JVM's heap) or memory allocated through Unsafe's allocation methods (so-called native memory which is also not part of the JVM's heap). 
The HybridMemorySegments are utilized within the MemoryManager class. The MemoryManager instances are responsible for maintaining the managed memory used in each of the TaskSlots. Managed Memory is used in different settings (e.g. for the RocksDB state backend in streaming applications). It can be configured using taskmanager.memory.managed.size (or the corresponding *.fraction parameter) [1]. See more details on that in [2].

I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias


On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis <[hidden email]> wrote:

Hi Matthias,

Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to use these HybridMemorySegments and in which operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
Hi Iacovos,
The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I hope that helps.

Best,
Matthias


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <[hidden email]> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:
Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <[hidden email]>, wrote:
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos
Reply | Threaded
Open this post in threaded view
|

Re: Caching Mechanism in Flink

Jack Kolokasis

Hi Matthias,

Yeap, I am refer to the tasks' off-heap configuration value. 

Best,
Iacovos

On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
When talking about the "off-heap" in your most recent message, are you still referring to the task's off-heap configuration value?
AFAIK, the HybridMemorySegment shouldn't be directly related to the off-heap parameter.

The HybridMemorySegment can be used as a wrapper around any kind of memory, i.e. byte[]. It can be either used for heap memory but also DirectByteBuffers (located in JVM's direct memory pool which is not part of the JVM's heap) or memory allocated through Unsafe's allocation methods (so-called native memory which is also not part of the JVM's heap). 
The HybridMemorySegments are utilized within the MemoryManager class. The MemoryManager instances are responsible for maintaining the managed memory used in each of the TaskSlots. Managed Memory is used in different settings (e.g. for the RocksDB state backend in streaming applications). It can be configured using taskmanager.memory.managed.size (or the corresponding *.fraction parameter) [1]. See more details on that in [2].

I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias


On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis <[hidden email]> wrote:

Hi Matthias,

Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to use these HybridMemorySegments and in which operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
Hi Iacovos,
The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I hope that helps.

Best,
Matthias


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <[hidden email]> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:
Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <[hidden email]>, wrote:
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos
Reply | Threaded
Open this post in threaded view
|

Re: Caching Mechanism in Flink

Andrey Zagrebin-5
Hi Iacovos,

As Matthias mentioned tasks' off-heap has nothing to do with the memory segments. This memory component is reserved only for the user code.

The memory segments are managed by Flink and used for batch workloads, like in memory joins etc.
They are part of managed memory (taskmanager.memory.managed.size)
which is also off-heap but not tasks' off-heap (taskmanager.memory.task.off-heap.size) and not JVM direct memory.

The memory segments are also used to wrap network buffers. Those are JVM direct memory (which is also off-heap) but again it is not about the tasks' off-heap.

Maybe, the confusion comes from the fact that 'off-heap' generally refers to everything which is not JVM Heap: direct or native memory.
The tasks' off-heap is that part of general 'off-heap' (direct memory limit to be precise) which is reserved only for the user code but not intended to be used by Flink.

Best,
Andrey

On Wed, Nov 11, 2020 at 3:06 PM Jack Kolokasis <[hidden email]> wrote:

Hi Matthias,

Yeap, I am refer to the tasks' off-heap configuration value. 

Best,
Iacovos

On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
When talking about the "off-heap" in your most recent message, are you still referring to the task's off-heap configuration value?
AFAIK, the HybridMemorySegment shouldn't be directly related to the off-heap parameter.

The HybridMemorySegment can be used as a wrapper around any kind of memory, i.e. byte[]. It can be either used for heap memory but also DirectByteBuffers (located in JVM's direct memory pool which is not part of the JVM's heap) or memory allocated through Unsafe's allocation methods (so-called native memory which is also not part of the JVM's heap). 
The HybridMemorySegments are utilized within the MemoryManager class. The MemoryManager instances are responsible for maintaining the managed memory used in each of the TaskSlots. Managed Memory is used in different settings (e.g. for the RocksDB state backend in streaming applications). It can be configured using taskmanager.memory.managed.size (or the corresponding *.fraction parameter) [1]. See more details on that in [2].

I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias


On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis <[hidden email]> wrote:

Hi Matthias,

Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to use these HybridMemorySegments and in which operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
Hi Iacovos,
The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I hope that helps.

Best,
Matthias


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <[hidden email]> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:
Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <[hidden email]>, wrote:
Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos